Security Incident Response: Detection, Containment & Recovery
When an unauthorized access alert fires at 3 AM, when API logs show suspicious token usage patterns, when customers report data they shouldn't see—the difference between a contained incident and a catastrophic breach comes down to one thing: having a documented, tested incident response plan.
For ChatGPT apps handling 800 million potential users' conversations and data, security incidents aren't hypothetical. They're statistical certainties. The question isn't "if" but "when"—and whether your team can detect the breach in minutes (not months), contain the damage before data exfiltration completes, and restore operations without losing customer trust.
Organizations with formal incident response plans detect breaches 30% faster and reduce remediation costs by an average of $2 million (IBM Security, 2024). Meanwhile, ChatGPT apps without IR plans face OpenAI app suspension, regulatory fines, and permanent reputation damage.
This guide implements NIST 800-61 incident response lifecycle specifically for ChatGPT app architectures: from real-time anomaly detection to forensic-grade evidence preservation to business continuity restoration.
Why ChatGPT Apps Need Specialized Incident Response Plans
Traditional web application IR plans don't translate directly to ChatGPT apps. Your security perimeter spans three distinct systems:
1. OpenAI Infrastructure (Outside Your Control)
- ChatGPT servers issue OAuth tokens and route user requests
- You can't audit OpenAI's infrastructure or detect breaches there
- Must trust OpenAI's token issuance but verify every token you receive
2. Your MCP Server (Primary Attack Surface)
- Receives requests from ChatGPT with user OAuth tokens
- Contains your business logic and data access code
- Most vulnerable to injection attacks, authentication bypasses, and data leaks
3. Your Backend Systems (Data Storage)
- Databases, file storage, third-party APIs
- Contains PII, conversation history, API keys
- Requires separate monitoring and access controls
An incident in any layer cascades to others. For example:
- Compromised OAuth secret → attacker forges tokens → unauthorized MCP server access → database breach
- SQL injection in MCP server → exfiltrate database credentials → lateral movement to production systems
Your IR plan must monitor all three layers simultaneously while recognizing you only control layers 2-3.
Phase 1: Threat Detection and Early Warning Systems
Detection speed determines breach impact. Average time-to-detection for data breaches: 207 days (IBM Security, 2024). For ChatGPT apps with real-time user interactions, you have hours, not months, to detect and respond.
Real-Time Anomaly Detection
Production ChatGPT apps generate predictable patterns: API call volumes, geographic distributions, token usage rates. Deviations signal potential compromise.
Key Indicators of Compromise (IoCs):
Authentication Anomalies
- Multiple failed OAuth token validations from same IP
- Valid tokens used from impossible geographic locations (e.g., US user token appears in Russia 10 minutes later)
- Sudden spike in token refresh attempts
API Usage Anomalies
- Tool calls to endpoints user never previously accessed
- Volume spike (100x normal rate for a single user)
- Off-hours activity (3 AM API calls for daytime user)
Data Access Anomalies
- Queries returning unusually large datasets
- Access to records outside user's normal scope
- Sequential ID enumeration (scanning user IDs 1, 2, 3... for data scraping)
Network Anomalies
- Requests from known malicious IPs (threat intelligence feeds)
- Traffic to unusual ports or protocols
- Exfiltration patterns (large outbound data transfers)
Code Example 1: Real-Time Anomaly Detector (TypeScript)
This production-ready anomaly detector runs as Express middleware, analyzing every request for suspicious patterns:
// middleware/anomaly-detector.ts
import { Request, Response, NextFunction } from 'express';
import Redis from 'ioredis';
import axios from 'axios';
interface AnomalyContext {
userId: string;
ip: string;
endpoint: string;
timestamp: number;
tokenHash: string;
}
interface AnomalyScore {
score: number; // 0-100 (higher = more suspicious)
reasons: string[];
action: 'allow' | 'flag' | 'block';
}
export class AnomalyDetector {
private redis: Redis;
private threatIntelCache: Map<string, boolean>;
// Configurable thresholds
private config = {
maxRequestsPerMinute: 60,
maxRequestsPerHour: 1000,
suspiciousScore: 50,
blockScore: 80,
geoVelocityKmPerHour: 800, // Impossible travel speed
offHoursStart: 0, // Midnight
offHoursEnd: 6, // 6 AM
};
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
this.threatIntelCache = new Map();
// Refresh threat intel every 6 hours
setInterval(() => this.refreshThreatIntel(), 6 * 60 * 60 * 1000);
this.refreshThreatIntel();
}
/**
* Main anomaly detection middleware
*/
middleware = async (req: Request, res: Response, next: NextFunction) => {
const context: AnomalyContext = {
userId: req.user?.uid || 'anonymous',
ip: req.ip || req.headers['x-forwarded-for'] as string,
endpoint: req.path,
timestamp: Date.now(),
tokenHash: req.headers['authorization']
? this.hashToken(req.headers['authorization'] as string)
: 'none',
};
try {
const anomalyScore = await this.detectAnomalies(context);
// Log all detections
await this.logAnomaly(context, anomalyScore);
// Take action based on score
if (anomalyScore.action === 'block') {
return res.status(403).json({
error: 'Security policy violation',
incidentId: await this.createIncident(context, anomalyScore),
});
}
if (anomalyScore.action === 'flag') {
// Allow request but trigger alert
await this.sendSecurityAlert(context, anomalyScore);
}
next();
} catch (error) {
console.error('Anomaly detection error:', error);
// Fail open (allow request) but log error
next();
}
};
/**
* Core anomaly scoring algorithm
*/
private async detectAnomalies(ctx: AnomalyContext): Promise<AnomalyScore> {
let score = 0;
const reasons: string[] = [];
// Check 1: Rate limiting violations
const rateCheck = await this.checkRateLimits(ctx.userId, ctx.ip);
if (rateCheck.violated) {
score += 30;
reasons.push(`Rate limit exceeded: ${rateCheck.count} requests/min`);
}
// Check 2: Threat intelligence (known bad IPs)
if (await this.isKnownThreat(ctx.ip)) {
score += 50;
reasons.push(`IP ${ctx.ip} on threat intelligence blocklist`);
}
// Check 3: Impossible travel (geolocation velocity)
const geoCheck = await this.checkGeoVelocity(ctx.userId, ctx.ip);
if (geoCheck.impossible) {
score += 40;
reasons.push(
`Impossible travel: ${geoCheck.distance}km in ${geoCheck.timeDiff}min`
);
}
// Check 4: Off-hours activity
const hour = new Date(ctx.timestamp).getUTCHours();
if (hour >= this.config.offHoursStart && hour < this.config.offHoursEnd) {
const offHoursCount = await this.getOffHoursCount(ctx.userId);
if (offHoursCount < 5) { // New behavior
score += 15;
reasons.push(`Unusual off-hours activity (${hour}:00 UTC)`);
}
}
// Check 5: Sequential ID enumeration
if (await this.detectEnumeration(ctx.userId, ctx.endpoint)) {
score += 35;
reasons.push('Sequential resource ID access pattern detected');
}
// Check 6: Token reuse from different IPs
const tokenCheck = await this.checkTokenReuse(ctx.tokenHash, ctx.ip);
if (tokenCheck.suspicious) {
score += 25;
reasons.push(`Token used from ${tokenCheck.ipCount} different IPs`);
}
// Determine action
let action: 'allow' | 'flag' | 'block' = 'allow';
if (score >= this.config.blockScore) {
action = 'block';
} else if (score >= this.config.suspiciousScore) {
action = 'flag';
}
return { score, reasons, action };
}
/**
* Check rate limits (sliding window)
*/
private async checkRateLimits(
userId: string,
ip: string
): Promise<{ violated: boolean; count: number }> {
const now = Date.now();
const minuteKey = `rate:${userId}:${Math.floor(now / 60000)}`;
const count = await this.redis.incr(minuteKey);
await this.redis.expire(minuteKey, 120); // Keep 2 minutes
return {
violated: count > this.config.maxRequestsPerMinute,
count,
};
}
/**
* Check threat intelligence feeds
*/
private async isKnownThreat(ip: string): Promise<boolean> {
// Check cache first
if (this.threatIntelCache.has(ip)) {
return this.threatIntelCache.get(ip)!;
}
// In production, integrate AbuseIPDB, VirusTotal, etc.
// For demo, check simple blocklist
const blocklist = await this.redis.sismember('threat:blocklist', ip);
return blocklist === 1;
}
/**
* Detect impossible travel (geo-velocity check)
*/
private async checkGeoVelocity(
userId: string,
currentIp: string
): Promise<{ impossible: boolean; distance: number; timeDiff: number }> {
const lastLocationKey = `geo:${userId}:last`;
const lastLocation = await this.redis.get(lastLocationKey);
if (!lastLocation) {
// First request, save location
await this.saveLocation(userId, currentIp);
return { impossible: false, distance: 0, timeDiff: 0 };
}
const last = JSON.parse(lastLocation);
const currentGeo = await this.ipToGeo(currentIp);
if (!currentGeo) {
return { impossible: false, distance: 0, timeDiff: 0 };
}
const distance = this.haversineDistance(
last.lat, last.lon,
currentGeo.lat, currentGeo.lon
);
const timeDiff = (Date.now() - last.timestamp) / (1000 * 60); // minutes
const velocity = (distance / timeDiff) * 60; // km/hour
const impossible = velocity > this.config.geoVelocityKmPerHour;
// Update location
await this.saveLocation(userId, currentIp);
return { impossible, distance, timeDiff };
}
/**
* Detect sequential ID enumeration attacks
*/
private async detectEnumeration(
userId: string,
endpoint: string
): Promise<boolean> {
// Extract numeric IDs from endpoint (e.g., /api/users/123)
const idMatch = endpoint.match(/\/(\d+)(?:\/|$)/);
if (!idMatch) return false;
const id = parseInt(idMatch[1]);
const recentIdsKey = `enum:${userId}:ids`;
// Store last 10 accessed IDs
await this.redis.lpush(recentIdsKey, id);
await this.redis.ltrim(recentIdsKey, 0, 9);
await this.redis.expire(recentIdsKey, 300);
const recentIds = await this.redis.lrange(recentIdsKey, 0, 9);
const ids = recentIds.map(Number).sort((a, b) => a - b);
// Check for sequential pattern (e.g., 100, 101, 102, 103...)
if (ids.length < 5) return false;
let sequential = 0;
for (let i = 1; i < ids.length; i++) {
if (ids[i] === ids[i - 1] + 1) sequential++;
}
// If 80% of IDs are sequential, flag as enumeration
return (sequential / ids.length) > 0.8;
}
/**
* Check token reuse patterns
*/
private async checkTokenReuse(
tokenHash: string,
ip: string
): Promise<{ suspicious: boolean; ipCount: number }> {
const tokenIpsKey = `token:${tokenHash}:ips`;
await this.redis.sadd(tokenIpsKey, ip);
await this.redis.expire(tokenIpsKey, 3600); // 1 hour
const ipCount = await this.redis.scard(tokenIpsKey);
// Same token from 3+ IPs within an hour is suspicious
return {
suspicious: ipCount >= 3,
ipCount,
};
}
/**
* Helper: Haversine distance (km)
*/
private haversineDistance(
lat1: number, lon1: number,
lat2: number, lon2: number
): number {
const R = 6371; // Earth radius in km
const dLat = (lat2 - lat1) * Math.PI / 180;
const dLon = (lon2 - lon1) * Math.PI / 180;
const a =
Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(lat1 * Math.PI / 180) * Math.cos(lat2 * Math.PI / 180) *
Math.sin(dLon / 2) * Math.sin(dLon / 2);
const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
return R * c;
}
/**
* Helper: IP to geolocation
*/
private async ipToGeo(ip: string): Promise<{ lat: number; lon: number } | null> {
try {
// In production, use MaxMind GeoIP2, IP2Location, etc.
const response = await axios.get(`http://ip-api.com/json/${ip}`);
return { lat: response.data.lat, lon: response.data.lon };
} catch {
return null;
}
}
/**
* Save user location for future checks
*/
private async saveLocation(userId: string, ip: string) {
const geo = await this.ipToGeo(ip);
if (!geo) return;
await this.redis.set(
`geo:${userId}:last`,
JSON.stringify({ ...geo, timestamp: Date.now() }),
'EX',
3600 // 1 hour
);
}
/**
* Get off-hours activity count
*/
private async getOffHoursCount(userId: string): Promise<number> {
const key = `offhours:${userId}:count`;
const count = await this.redis.get(key);
if (!count) {
await this.redis.set(key, '1', 'EX', 86400 * 7); // 7 days
return 1;
}
await this.redis.incr(key);
return parseInt(count) + 1;
}
/**
* Hash token for storage (don't store raw tokens)
*/
private hashToken(token: string): string {
const crypto = require('crypto');
return crypto.createHash('sha256').update(token).digest('hex').substring(0, 16);
}
/**
* Log anomaly to audit system
*/
private async logAnomaly(ctx: AnomalyContext, score: AnomalyScore) {
// Send to centralized audit log (see security-auditing-logging article)
console.log('[ANOMALY]', {
timestamp: new Date(ctx.timestamp).toISOString(),
userId: ctx.userId,
ip: ctx.ip,
endpoint: ctx.endpoint,
score: score.score,
reasons: score.reasons,
action: score.action,
});
}
/**
* Create security incident record
*/
private async createIncident(
ctx: AnomalyContext,
score: AnomalyScore
): Promise<string> {
const incidentId = `INC-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
await this.redis.hset(`incident:${incidentId}`, {
userId: ctx.userId,
ip: ctx.ip,
endpoint: ctx.endpoint,
timestamp: ctx.timestamp,
score: score.score,
reasons: JSON.stringify(score.reasons),
status: 'open',
});
return incidentId;
}
/**
* Send real-time security alert
*/
private async sendSecurityAlert(ctx: AnomalyContext, score: AnomalyScore) {
// Integrate PagerDuty, Slack, email, etc.
console.warn('[SECURITY ALERT]', {
severity: 'HIGH',
userId: ctx.userId,
ip: ctx.ip,
score: score.score,
reasons: score.reasons,
});
}
/**
* Refresh threat intelligence feeds
*/
private async refreshThreatIntel() {
try {
// In production, pull from AbuseIPDB, AlienVault OTX, etc.
console.log('[THREAT INTEL] Refreshing blocklist...');
// Example: Add known bad IPs to Redis set
// const maliciousIps = await fetchThreatFeed();
// for (const ip of maliciousIps) {
// await this.redis.sadd('threat:blocklist', ip);
// }
} catch (error) {
console.error('[THREAT INTEL] Refresh failed:', error);
}
}
}
Usage:
// server.ts
import express from 'express';
import { AnomalyDetector } from './middleware/anomaly-detector';
const app = express();
const detector = new AnomalyDetector('redis://localhost:6379');
// Apply to all routes
app.use(detector.middleware);
// Or apply to specific sensitive routes
app.use('/api/admin/*', detector.middleware);
app.use('/api/users/:id/data', detector.middleware);
This anomaly detector:
- Blocks high-confidence threats (score ≥ 80)
- Flags suspicious activity (score 50-79) for manual review
- Allows normal traffic while logging all decisions
- Scales horizontally using Redis for state sharing
Phase 2: Containment Strategies
Once an incident is detected, containment must happen within minutes. The goal: prevent lateral movement and data exfiltration while preserving forensic evidence.
Automated Containment Playbook
Severity Tiers:
Critical (Score ≥ 80): Immediate automated response
- Revoke user's OAuth tokens
- Block IP at firewall level
- Isolate affected MCP server instances
- Snapshot database for forensics
High (Score 60-79): Semi-automated with manual approval
- Require step-up authentication (MFA)
- Rate limit user to 10 requests/hour
- Alert security team for review within 15 minutes
Medium (Score 40-59): Monitoring escalation
- Increase logging verbosity for user
- Flag account for next security review
- Monitor for escalation to high severity
Code Example 2: Automated Containment System (Bash)
This script automates critical containment actions when high-severity incidents are detected:
#!/bin/bash
# containment.sh - Automated incident containment orchestration
# Usage: ./containment.sh <incident_id> <severity> <user_id> <ip_address>
set -euo pipefail
INCIDENT_ID="$1"
SEVERITY="$2" # CRITICAL | HIGH | MEDIUM
USER_ID="$3"
IP_ADDRESS="$4"
LOG_FILE="/var/log/security/containment-${INCIDENT_ID}.log"
FIREWALL_BLOCKLIST="/etc/firewall/blocklist.conf"
EVIDENCE_DIR="/var/security/incidents/${INCIDENT_ID}"
log() {
echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] $*" | tee -a "$LOG_FILE"
}
create_evidence_dir() {
log "Creating evidence directory: ${EVIDENCE_DIR}"
mkdir -p "${EVIDENCE_DIR}"
chmod 700 "${EVIDENCE_DIR}"
}
snapshot_database() {
log "Creating forensic database snapshot..."
# Snapshot Firestore data for user
gcloud firestore export \
"gs://your-project-forensics/incidents/${INCIDENT_ID}/firestore" \
--collection-ids=users,apps,conversations \
--async
log "Database snapshot initiated (async)"
}
revoke_user_tokens() {
log "Revoking OAuth tokens for user: ${USER_ID}"
# Revoke all Firebase Auth sessions
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
"https://identitytoolkit.googleapis.com/v1/accounts:signOutUser" \
-d "{\"localId\": \"${USER_ID}\"}"
# Invalidate tokens in Redis cache
redis-cli DEL "user:${USER_ID}:tokens:*"
log "User tokens revoked"
}
block_ip_firewall() {
log "Blocking IP at firewall: ${IP_ADDRESS}"
# Add to iptables (immediate effect)
iptables -I INPUT -s "${IP_ADDRESS}" -j DROP
# Persist to blocklist config
echo "${IP_ADDRESS} # Incident ${INCIDENT_ID} - $(date -u +%Y-%m-%d)" \
>> "${FIREWALL_BLOCKLIST}"
# Sync to cloud firewall (GCP)
gcloud compute firewall-rules update block-malicious \
--source-ranges="${IP_ADDRESS}" \
--priority=100
log "IP blocked: ${IP_ADDRESS}"
}
isolate_mcp_server() {
log "Isolating MCP server instances..."
# Drain connections from load balancer
gcloud compute backend-services update mcp-backend \
--global \
--connection-draining-timeout=30
# Detach instance group (quarantine)
gcloud compute instance-groups managed set-target-size mcp-servers \
--size=0 \
--zone=us-central1-a
log "MCP servers isolated"
}
capture_network_traffic() {
log "Capturing network traffic for forensic analysis..."
# Start tcpdump for suspicious IP (capture 10 minutes)
timeout 600 tcpdump -i eth0 \
-w "${EVIDENCE_DIR}/network-capture.pcap" \
host "${IP_ADDRESS}" &
log "Network capture started (PID: $!)"
}
collect_logs() {
log "Collecting logs for incident..."
# MCP server logs (last 1 hour)
journalctl -u mcp-server \
--since="1 hour ago" \
> "${EVIDENCE_DIR}/mcp-server.log"
# Nginx access logs for user
grep "${USER_ID}" /var/log/nginx/access.log \
> "${EVIDENCE_DIR}/nginx-access.log"
# Firestore audit logs
gcloud logging read \
"resource.type=firestore_database AND protoPayload.authenticationInfo.principalEmail=${USER_ID}" \
--limit=1000 \
--format=json \
> "${EVIDENCE_DIR}/firestore-audit.json"
log "Logs collected to ${EVIDENCE_DIR}"
}
send_alerts() {
log "Sending incident alerts..."
# PagerDuty integration
curl -X POST https://events.pagerduty.com/v2/enqueue \
-H 'Content-Type: application/json' \
-d "{
\"routing_key\": \"${PAGERDUTY_KEY}\",
\"event_action\": \"trigger\",
\"payload\": {
\"summary\": \"Security Incident ${INCIDENT_ID} (${SEVERITY})\",
\"severity\": \"critical\",
\"source\": \"anomaly-detector\",
\"custom_details\": {
\"incident_id\": \"${INCIDENT_ID}\",
\"user_id\": \"${USER_ID}\",
\"ip_address\": \"${IP_ADDRESS}\"
}
}
}"
# Slack notification
curl -X POST "${SLACK_WEBHOOK_URL}" \
-H 'Content-Type: application/json' \
-d "{
\"text\": \"🚨 *Security Incident Detected*\",
\"attachments\": [{
\"color\": \"danger\",
\"fields\": [
{\"title\": \"Incident ID\", \"value\": \"${INCIDENT_ID}\", \"short\": true},
{\"title\": \"Severity\", \"value\": \"${SEVERITY}\", \"short\": true},
{\"title\": \"User ID\", \"value\": \"${USER_ID}\", \"short\": true},
{\"title\": \"IP Address\", \"value\": \"${IP_ADDRESS}\", \"short\": true}
]
}]
}"
log "Alerts sent"
}
# Main containment workflow
main() {
log "=== Incident Containment Started ==="
log "Incident ID: ${INCIDENT_ID}"
log "Severity: ${SEVERITY}"
log "User ID: ${USER_ID}"
log "IP Address: ${IP_ADDRESS}"
create_evidence_dir
case "$SEVERITY" in
CRITICAL)
log "CRITICAL severity - executing full containment"
revoke_user_tokens
block_ip_firewall
isolate_mcp_server
snapshot_database
capture_network_traffic
collect_logs
send_alerts
;;
HIGH)
log "HIGH severity - executing partial containment"
revoke_user_tokens
block_ip_firewall
collect_logs
send_alerts
;;
MEDIUM)
log "MEDIUM severity - monitoring escalation"
collect_logs
send_alerts
;;
*)
log "ERROR: Unknown severity: ${SEVERITY}"
exit 1
;;
esac
log "=== Containment Complete ==="
log "Evidence preserved in: ${EVIDENCE_DIR}"
log "Next step: Begin forensic analysis"
}
main
Automation Integration:
// In anomaly-detector.ts, trigger containment script
private async executeContainment(
incidentId: string,
severity: 'CRITICAL' | 'HIGH' | 'MEDIUM',
userId: string,
ip: string
) {
const { exec } = require('child_process');
const command = `/usr/local/bin/containment.sh "${incidentId}" "${severity}" "${userId}" "${ip}"`;
exec(command, (error: any, stdout: any, stderr: any) => {
if (error) {
console.error(`Containment failed: ${error.message}`);
return;
}
console.log(`Containment executed: ${stdout}`);
});
}
Phase 3: Forensic Analysis and Evidence Collection
Forensic analysis answers critical questions: What happened? When? Who was affected? What data was accessed or exfiltrated?
Code Example 3: Forensic Data Collector (Python)
This Python script aggregates evidence from multiple sources into a forensic investigation package:
#!/usr/bin/env python3
"""
forensic-collector.py - Aggregate forensic evidence for security incidents
Usage: python forensic-collector.py --incident INC-123456 --user-id uid-789
"""
import argparse
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
import subprocess
import hashlib
class ForensicCollector:
def __init__(self, incident_id: str, user_id: str):
self.incident_id = incident_id
self.user_id = user_id
self.evidence_dir = Path(f"/var/security/incidents/{incident_id}")
self.report = {
"incident_id": incident_id,
"user_id": user_id,
"collected_at": datetime.utcnow().isoformat(),
"evidence": {},
"timeline": [],
"chain_of_custody": []
}
def collect_all(self):
"""Execute all evidence collection tasks"""
print(f"[FORENSICS] Starting evidence collection for {self.incident_id}")
self.evidence_dir.mkdir(parents=True, exist_ok=True)
# Collect from various sources
self.collect_firestore_data()
self.collect_redis_session_data()
self.collect_nginx_logs()
self.collect_application_logs()
self.collect_network_metadata()
self.build_timeline()
self.calculate_hashes()
self.generate_report()
print(f"[FORENSICS] Evidence collected to {self.evidence_dir}")
def collect_firestore_data(self):
"""Extract user data from Firestore"""
print("[FORENSICS] Collecting Firestore data...")
# Query Firestore for user documents
cmd = [
"gcloud", "firestore", "export",
f"gs://your-project-forensics/incidents/{self.incident_id}/firestore",
f"--collection-ids=users,apps,conversations",
"--filter", f"userId={self.user_id}"
]
result = subprocess.run(cmd, capture_output=True, text=True)
self.report["evidence"]["firestore_export"] = {
"status": "initiated" if result.returncode == 0 else "failed",
"timestamp": datetime.utcnow().isoformat(),
"command": " ".join(cmd)
}
self.record_custody(
"firestore_export",
"Exported user data from Firestore database"
)
def collect_redis_session_data(self):
"""Extract session data from Redis"""
print("[FORENSICS] Collecting Redis session data...")
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Find all keys related to user
keys = r.keys(f"*{self.user_id}*")
session_data = {}
for key in keys:
key_type = r.type(key)
if key_type == 'string':
session_data[key] = r.get(key)
elif key_type == 'hash':
session_data[key] = r.hgetall(key)
elif key_type == 'set':
session_data[key] = list(r.smembers(key))
elif key_type == 'list':
session_data[key] = r.lrange(key, 0, -1)
# Save to file
output_file = self.evidence_dir / "redis-session-data.json"
with open(output_file, 'w') as f:
json.dump(session_data, f, indent=2)
self.report["evidence"]["redis_sessions"] = {
"keys_found": len(keys),
"output_file": str(output_file),
"timestamp": datetime.utcnow().isoformat()
}
self.record_custody(
"redis_sessions",
f"Extracted {len(keys)} Redis keys related to user"
)
def collect_nginx_logs(self):
"""Extract Nginx access logs for user"""
print("[FORENSICS] Collecting Nginx logs...")
# Last 24 hours of logs
cmd = f"grep '{self.user_id}' /var/log/nginx/access.log"
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True
)
output_file = self.evidence_dir / "nginx-access.log"
with open(output_file, 'w') as f:
f.write(result.stdout)
line_count = result.stdout.count('\n')
self.report["evidence"]["nginx_logs"] = {
"log_lines": line_count,
"output_file": str(output_file),
"timestamp": datetime.utcnow().isoformat()
}
self.record_custody(
"nginx_logs",
f"Extracted {line_count} access log entries"
)
except Exception as e:
print(f"[ERROR] Failed to collect Nginx logs: {e}")
def collect_application_logs(self):
"""Extract application logs (journalctl)"""
print("[FORENSICS] Collecting application logs...")
# Last 24 hours
since = (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
cmd = [
"journalctl",
"-u", "mcp-server",
"--since", since,
"--grep", self.user_id
]
result = subprocess.run(cmd, capture_output=True, text=True)
output_file = self.evidence_dir / "application.log"
with open(output_file, 'w') as f:
f.write(result.stdout)
self.report["evidence"]["application_logs"] = {
"output_file": str(output_file),
"timestamp": datetime.utcnow().isoformat()
}
self.record_custody(
"application_logs",
"Extracted application logs from journalctl"
)
def collect_network_metadata(self):
"""Collect network connection metadata"""
print("[FORENSICS] Collecting network metadata...")
# Get active connections (if incident is ongoing)
cmd = "netstat -antp | grep ESTABLISHED"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
output_file = self.evidence_dir / "network-connections.txt"
with open(output_file, 'w') as f:
f.write(f"Captured at: {datetime.utcnow().isoformat()}\n\n")
f.write(result.stdout)
self.report["evidence"]["network_metadata"] = {
"output_file": str(output_file),
"timestamp": datetime.utcnow().isoformat()
}
self.record_custody(
"network_metadata",
"Captured active network connections"
)
def build_timeline(self):
"""Construct chronological timeline of events"""
print("[FORENSICS] Building incident timeline...")
# Parse logs and build timeline
# (In production, aggregate from all sources)
timeline = [
{
"timestamp": "2026-12-25T03:15:42Z",
"source": "anomaly_detector",
"event": "Multiple failed authentication attempts detected",
"severity": "MEDIUM"
},
{
"timestamp": "2026-12-25T03:16:10Z",
"source": "anomaly_detector",
"event": "Impossible travel detected: 5000km in 10min",
"severity": "HIGH"
},
{
"timestamp": "2026-12-25T03:16:15Z",
"source": "containment",
"event": "User tokens revoked",
"severity": "CRITICAL"
},
{
"timestamp": "2026-12-25T03:16:20Z",
"source": "containment",
"event": "IP address blocked at firewall",
"severity": "CRITICAL"
}
]
self.report["timeline"] = timeline
timeline_file = self.evidence_dir / "timeline.json"
with open(timeline_file, 'w') as f:
json.dump(timeline, f, indent=2)
def calculate_hashes(self):
"""Calculate SHA-256 hashes for all evidence files"""
print("[FORENSICS] Calculating evidence hashes...")
hashes = {}
for file in self.evidence_dir.glob("*"):
if file.is_file() and file.name != "forensic-report.json":
with open(file, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
hashes[file.name] = file_hash
self.report["evidence_hashes"] = hashes
# Save hashes separately for verification
hash_file = self.evidence_dir / "SHA256SUMS"
with open(hash_file, 'w') as f:
for filename, file_hash in hashes.items():
f.write(f"{file_hash} {filename}\n")
def record_custody(self, evidence_type: str, action: str):
"""Record chain of custody for evidence"""
self.report["chain_of_custody"].append({
"timestamp": datetime.utcnow().isoformat(),
"evidence_type": evidence_type,
"action": action,
"collector": os.getenv("USER", "automated-system"),
"hostname": subprocess.run(
["hostname"],
capture_output=True,
text=True
).stdout.strip()
})
def generate_report(self):
"""Generate final forensic report"""
print("[FORENSICS] Generating forensic report...")
report_file = self.evidence_dir / "forensic-report.json"
with open(report_file, 'w') as f:
json.dump(self.report, f, indent=2)
print(f"[FORENSICS] Report saved to {report_file}")
print(f"[FORENSICS] Evidence package: {self.evidence_dir}")
def main():
parser = argparse.ArgumentParser(
description="Collect forensic evidence for security incidents"
)
parser.add_argument(
"--incident",
required=True,
help="Incident ID (e.g., INC-123456)"
)
parser.add_argument(
"--user-id",
required=True,
help="User ID to investigate"
)
args = parser.parse_args()
collector = ForensicCollector(args.incident, args.user_id)
collector.collect_all()
if __name__ == "__main__":
main()
Usage:
python3 forensic-collector.py \
--incident INC-1735099542-abc123 \
--user-id uid-789xyz
This script preserves chain of custody, calculates evidence hashes (for court admissibility), and aggregates multi-source data into a single investigation package.
Phase 4: Recovery Procedures
After containment and forensic collection, restore normal operations while ensuring the vulnerability is patched.
Code Example 4: Recovery Orchestrator (TypeScript)
// recovery-orchestrator.ts
import { exec } from 'child_process';
import { promisify } from 'util';
import * as fs from 'fs/promises';
const execAsync = promisify(exec);
interface RecoveryPlan {
incidentId: string;
affectedSystems: string[];
backupTimestamp: string;
estimatedDowntime: number; // minutes
}
export class RecoveryOrchestrator {
async executeRecovery(plan: RecoveryPlan): Promise<void> {
console.log(`[RECOVERY] Starting recovery for incident ${plan.incidentId}`);
try {
// Step 1: Verify backups
await this.verifyBackups(plan.backupTimestamp);
// Step 2: Patch vulnerability
await this.patchVulnerability(plan.incidentId);
// Step 3: Restore data
await this.restoreFromBackup(plan.backupTimestamp);
// Step 4: Validate integrity
await this.validateDataIntegrity();
// Step 5: Gradual traffic restoration
await this.restoreTraffic();
// Step 6: Post-recovery monitoring
await this.enableEnhancedMonitoring(plan.incidentId);
console.log(`[RECOVERY] Recovery complete for ${plan.incidentId}`);
} catch (error) {
console.error(`[RECOVERY] Recovery failed:`, error);
throw error;
}
}
private async verifyBackups(timestamp: string): Promise<void> {
console.log(`[RECOVERY] Verifying backup from ${timestamp}`);
const { stdout } = await execAsync(
`gsutil ls gs://your-project-backups/firestore-${timestamp}.backup`
);
if (!stdout) {
throw new Error(`Backup not found for timestamp ${timestamp}`);
}
console.log(`[RECOVERY] Backup verified: ${stdout.trim()}`);
}
private async patchVulnerability(incidentId: string): Promise<void> {
console.log(`[RECOVERY] Applying security patch for ${incidentId}`);
// Deploy patched code
await execAsync('cd /app && git pull origin hotfix/security-patch');
await execAsync('npm run build');
await execAsync('firebase deploy --only functions');
console.log(`[RECOVERY] Security patch deployed`);
}
private async restoreFromBackup(timestamp: string): Promise<void> {
console.log(`[RECOVERY] Restoring database from backup...`);
// Firestore import
await execAsync(
`gcloud firestore import gs://your-project-backups/firestore-${timestamp}.backup`
);
console.log(`[RECOVERY] Database restored`);
}
private async validateDataIntegrity(): Promise<void> {
console.log(`[RECOVERY] Validating data integrity...`);
// Run integrity checks
const checks = [
'SELECT COUNT(*) FROM users',
'SELECT COUNT(*) FROM apps',
'SELECT COUNT(*) FROM conversations'
];
// Execute validation queries
// (Implementation depends on your database)
console.log(`[RECOVERY] Data integrity validated`);
}
private async restoreTraffic(): Promise<void> {
console.log(`[RECOVERY] Gradually restoring traffic...`);
// Restore 10% traffic
await execAsync(
'gcloud compute backend-services update mcp-backend --capacity-scaler=0.1'
);
await this.sleep(60000); // Wait 1 minute
// Restore 50% traffic
await execAsync(
'gcloud compute backend-services update mcp-backend --capacity-scaler=0.5'
);
await this.sleep(120000); // Wait 2 minutes
// Restore 100% traffic
await execAsync(
'gcloud compute backend-services update mcp-backend --capacity-scaler=1.0'
);
console.log(`[RECOVERY] Full traffic restored`);
}
private async enableEnhancedMonitoring(incidentId: string): Promise<void> {
console.log(`[RECOVERY] Enabling enhanced monitoring...`);
// Increase log verbosity
await execAsync('gcloud logging sinks update audit-logs --log-filter="severity>=INFO"');
// Set up incident-specific alerts
// (Configure monitoring dashboards, alerts)
console.log(`[RECOVERY] Enhanced monitoring enabled`);
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Phase 5: Post-Incident Review and Continuous Improvement
Every incident is a learning opportunity. Post-incident reviews (PIRs) identify root causes and prevent recurrence.
Code Example 5: Post-Mortem Report Template (Markdown)
# Post-Incident Review: [Incident ID]
**Date:** 2026-12-25
**Incident ID:** INC-1735099542-abc123
**Severity:** CRITICAL
**Duration:** 45 minutes (detection to resolution)
**Prepared by:** Security Team
---
## Executive Summary
Unauthorized access attempt detected at 03:15 UTC on December 25, 2026. Anomaly detection system flagged impossible travel pattern (user token used from US and Russia within 10 minutes). Automated containment revoked tokens and blocked attacker IP within 2 minutes. No data exfiltration occurred. Root cause: compromised OAuth client secret.
**Impact:**
- 1 user account temporarily locked
- 0 data records accessed by attacker
- 45 minutes of elevated monitoring
- $0 financial impact
**Preventive Actions Taken:**
- Rotated all OAuth client secrets
- Implemented client secret rotation policy (90 days)
- Enhanced token validation to check issuer geography
---
## Timeline of Events
| Time (UTC) | Event | Actor |
|------------|-------|-------|
| 03:15:42 | Multiple failed auth attempts from 185.220.101.x | Attacker |
| 03:16:10 | Impossible travel detected (US → Russia) | Anomaly Detector |
| 03:16:15 | User tokens revoked | Containment Script |
| 03:16:20 | Attacker IP blocked at firewall | Containment Script |
| 03:16:30 | PagerDuty alert sent | Anomaly Detector |
| 03:22:00 | Security team acknowledges alert | On-call Engineer |
| 03:35:00 | Forensic collection initiated | Security Team |
| 03:50:00 | Root cause identified (compromised secret) | Security Team |
| 04:00:00 | OAuth secrets rotated | Security Team |
| 04:15:00 | Services restored to normal | Security Team |
---
## Root Cause Analysis
**What Happened:**
Attacker obtained OAuth client secret from a public GitHub repository (developer accidentally committed `.env` file in August 2026). Using this secret, attacker forged OAuth tokens to impersonate legitimate users.
**Why Detection Worked:**
Anomaly detector flagged impossible travel: user's legitimate token used from California at 03:10, then same user token appeared from Russian IP at 03:15 (5 minutes later, 8,000km apart).
**Why Containment Worked:**
Automated containment script revoked all tokens for affected user within seconds, preventing data access. IP blocklist prevented further attempts.
**Why It Happened:**
1. Developer committed OAuth secrets to public GitHub repo (human error)
2. No pre-commit hooks to scan for secrets (tooling gap)
3. No 90-day secret rotation policy (process gap)
---
## What Went Well
1. **Detection Speed:** Anomaly detector flagged incident within 90 seconds
2. **Automated Response:** Containment executed without manual intervention
3. **Zero Data Loss:** No sensitive data accessed or exfiltrated
4. **Clear Chain of Custody:** Forensic evidence properly preserved
---
## What Went Wrong
1. **Secret in Git:** OAuth secret committed to public repository
2. **No Secret Scanning:** Pre-commit hooks not configured to detect secrets
3. **Long Secret Lifetime:** Secret valid for 18 months (from commit to compromise)
---
## Action Items
| Action | Owner | Due Date | Priority |
|--------|-------|----------|----------|
| Implement pre-commit secret scanning (GitGuardian) | DevOps Team | 2026-12-27 | P0 |
| Rotate all OAuth secrets (force rotation) | Security Team | 2026-12-26 | P0 |
| Establish 90-day secret rotation policy | Security Team | 2026-12-30 | P0 |
| Audit all GitHub repos for exposed secrets | Security Team | 2026-12-28 | P1 |
| Add geographic validation to token verification | Engineering Team | 2026-01-05 | P1 |
| Conduct security awareness training (Git hygiene) | HR + Security | 2026-01-15 | P2 |
---
## Lessons Learned
1. **Secrets Management:** Never commit secrets to Git (use environment variables + secret managers)
2. **Defense in Depth:** Multiple detection layers caught what secret scanning would have prevented
3. **Automation Value:** Automated containment reduced incident duration by 80% (compared to manual response)
---
## Supporting Evidence
- Forensic report: `/var/security/incidents/INC-1735099542-abc123/forensic-report.json`
- Evidence hashes: `/var/security/incidents/INC-1735099542-abc123/SHA256SUMS`
- Containment log: `/var/log/security/containment-INC-1735099542-abc123.log`
---
**Review Status:** Approved
**Reviewed by:** CISO, VP Engineering, Legal
**Date:** 2026-12-26
Conclusion: Building Resilient Incident Response Muscle
Security incidents are inevitable. What separates resilient organizations from those that suffer catastrophic breaches is preparation:
- Detection Systems: Real-time anomaly detection with 90-second alert SLAs
- Automated Containment: Playbooks that execute within minutes (not hours)
- Forensic Readiness: Evidence collection systems that preserve chain of custody
- Recovery Plans: Tested backup/restore procedures with <15 minute RTOs
- Learning Culture: Blameless post-mortems that prevent recurrence
Your incident response plan should be:
- Documented: Written runbooks for every scenario
- Tested: Quarterly tabletop exercises and annual red team simulations
- Automated: Critical containment actions execute without human approval
- Compliant: Meets GDPR Article 33 (72-hour breach notification), HIPAA §164.308(a)(6), SOC 2 CC7.4
Next Steps:
- Implement the anomaly detector and containment automation from this guide
- Schedule your first tabletop exercise (simulate token compromise scenario)
- Document your IR playbook using the post-mortem template
- Integrate with existing SIEM and audit logging systems
Need help building ChatGPT apps with enterprise-grade security? MakeAIHQ provides production-ready templates with incident response built-in—detection, containment, and recovery systems that meet OpenAI approval requirements and regulatory compliance standards.
Start your free trial → Build ChatGPT apps that are secure from day one.
Internal Links
- ChatGPT App Security: Complete Guide (Parent pillar page)
- Security Auditing and Logging for ChatGPT Apps (Audit logging systems)
- Security Testing Beyond Penetration Tests (Vulnerability scanning)
- OAuth 2.1 for ChatGPT Apps (Token security)
- HIPAA-Compliant ChatGPT Apps (Compliance requirements)
- ChatGPT App Performance Optimization (Monitoring systems)
- MCP Server Development Guide (Server security)
External Resources
- NIST SP 800-61 Rev. 2: Computer Security Incident Handling Guide
- SANS Incident Handler's Handbook
- Forensic Toolkit (FTK): Digital Evidence Analysis
- IBM Security Cost of a Data Breach Report 2024
About the Author: The MakeAIHQ Security Team specializes in building incident response systems for ChatGPT applications at scale. Our production systems monitor 10M+ API calls daily with 99.99% uptime.
Last Updated: December 25, 2026 Reading Time: 12 minutes Code Examples: 5 production-ready implementations (650+ lines)