AI Content Moderation for ChatGPT Apps
Building a successful ChatGPT app requires more than great functionality—it demands robust content moderation to protect users, maintain community standards, and ensure legal compliance. Whether you're creating a customer service bot, educational tool, or creative assistant, implementing effective content moderation is essential for long-term success.
Content moderation in ChatGPT apps serves multiple critical purposes: preventing harmful content from reaching users, protecting minors from inappropriate material, maintaining brand reputation, and complying with regulations like GDPR, COPPA, and industry-specific standards. Without proper moderation, your app faces risks ranging from user complaints to legal liability and platform removal.
This comprehensive guide demonstrates how to implement production-grade content moderation using OpenAI's Moderation API, custom filtering systems, user reporting mechanisms, and automated escalation policies. You'll learn to build a multi-layered moderation system that balances user safety with minimal false positives, ensuring your ChatGPT app provides a secure, trustworthy experience.
By the end of this article, you'll have production-ready code for a complete moderation pipeline that handles everything from real-time content scanning to compliance auditing and human review workflows.
OpenAI Moderation API Integration
The OpenAI Moderation API provides the foundation for content safety in ChatGPT applications. This free API analyzes text for harmful content across categories including hate speech, harassment, self-harm, sexual content, and violence. Integration takes minutes but provides enterprise-grade protection.
The Moderation API returns category scores and binary flags indicating whether content violates policies. Each category has a confidence score (0.0 to 1.0) and a flagged boolean. You can customize thresholds based on your application's risk tolerance and target audience.
Here's a production-ready moderation client that implements retry logic, caching, and threshold customization:
import openai
import time
import hashlib
import json
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
class ModerationCategory(Enum):
"""Content moderation categories from OpenAI API"""
HATE = "hate"
HATE_THREATENING = "hate/threatening"
HARASSMENT = "harassment"
HARASSMENT_THREATENING = "harassment/threatening"
SELF_HARM = "self-harm"
SELF_HARM_INTENT = "self-harm/intent"
SELF_HARM_INSTRUCTIONS = "self-harm/instructions"
SEXUAL = "sexual"
SEXUAL_MINORS = "sexual/minors"
VIOLENCE = "violence"
VIOLENCE_GRAPHIC = "violence/graphic"
@dataclass
class ModerationResult:
"""Structured moderation result"""
flagged: bool
categories: Dict[str, bool]
category_scores: Dict[str, float]
violations: List[str]
severity: str # low, medium, high, critical
response_time_ms: float
cached: bool = False
class ContentModerator:
"""
Production-grade content moderation using OpenAI Moderation API
Features:
- Automatic retries with exponential backoff
- Response caching for performance
- Customizable category thresholds
- Batch processing support
- Comprehensive logging
"""
def __init__(
self,
api_key: str,
custom_thresholds: Optional[Dict[str, float]] = None,
cache_ttl: int = 3600,
max_retries: int = 3
):
openai.api_key = api_key
self.max_retries = max_retries
self.cache_ttl = cache_ttl
# Default thresholds - customize based on your risk tolerance
self.thresholds = {
ModerationCategory.HATE.value: 0.7,
ModerationCategory.HATE_THREATENING.value: 0.3,
ModerationCategory.HARASSMENT.value: 0.7,
ModerationCategory.HARASSMENT_THREATENING.value: 0.3,
ModerationCategory.SELF_HARM.value: 0.3,
ModerationCategory.SELF_HARM_INTENT.value: 0.2,
ModerationCategory.SELF_HARM_INSTRUCTIONS.value: 0.2,
ModerationCategory.SEXUAL.value: 0.6,
ModerationCategory.SEXUAL_MINORS.value: 0.1,
ModerationCategory.VIOLENCE.value: 0.7,
ModerationCategory.VIOLENCE_GRAPHIC.value: 0.5
}
if custom_thresholds:
self.thresholds.update(custom_thresholds)
self._cache: Dict[str, Tuple[ModerationResult, float]] = {}
def _get_cache_key(self, text: str) -> str:
"""Generate cache key from text content"""
return hashlib.sha256(text.encode()).hexdigest()
def _is_cache_valid(self, timestamp: float) -> bool:
"""Check if cached result is still valid"""
return (time.time() - timestamp) < self.cache_ttl
def moderate(self, text: str, use_cache: bool = True) -> ModerationResult:
"""
Moderate text content with automatic retries and caching
Args:
text: Content to moderate
use_cache: Whether to use cached results
Returns:
ModerationResult with detailed analysis
"""
start_time = time.time()
# Check cache first
if use_cache:
cache_key = self._get_cache_key(text)
if cache_key in self._cache:
result, timestamp = self._cache[cache_key]
if self._is_cache_valid(timestamp):
result.cached = True
result.response_time_ms = (time.time() - start_time) * 1000
return result
# Call API with retry logic
for attempt in range(self.max_retries):
try:
response = openai.Moderation.create(input=text)
result_data = response.results[0]
# Analyze violations based on custom thresholds
violations = []
for category, score in result_data.category_scores.items():
if score > self.thresholds.get(category, 0.5):
violations.append(category)
# Determine severity
severity = self._calculate_severity(
result_data.category_scores,
violations
)
result = ModerationResult(
flagged=len(violations) > 0,
categories=result_data.categories,
category_scores=result_data.category_scores,
violations=violations,
severity=severity,
response_time_ms=(time.time() - start_time) * 1000,
cached=False
)
# Cache the result
if use_cache:
self._cache[cache_key] = (result, time.time())
return result
except openai.error.RateLimitError:
if attempt < self.max_retries - 1:
sleep_time = 2 ** attempt
time.sleep(sleep_time)
else:
raise
except Exception as e:
if attempt < self.max_retries - 1:
time.sleep(1)
else:
raise
def _calculate_severity(
self,
scores: Dict[str, float],
violations: List[str]
) -> str:
"""Calculate overall severity level"""
if not violations:
return "low"
critical_categories = [
"sexual/minors",
"self-harm/intent",
"self-harm/instructions",
"violence/graphic"
]
# Check for critical violations
for category in critical_categories:
if category in violations:
return "critical"
# Calculate average score for violations
if violations:
avg_score = sum(scores[v] for v in violations) / len(violations)
if avg_score > 0.9:
return "high"
elif avg_score > 0.7:
return "medium"
return "low"
def moderate_batch(
self,
texts: List[str],
use_cache: bool = True
) -> List[ModerationResult]:
"""Moderate multiple texts efficiently"""
results = []
for text in texts:
result = self.moderate(text, use_cache)
results.append(result)
return results
def clear_cache(self):
"""Clear the moderation cache"""
self._cache.clear()
The Moderation API returns results in milliseconds, making it suitable for real-time filtering. For high-traffic applications, implement the caching strategy shown above to reduce API calls and improve response times. The cache uses content hashing to ensure identical content receives consistent moderation decisions.
Learn more about integrating the Moderation API in our ChatGPT App Security Hardening Guide.
Custom Filter Engine
While the OpenAI Moderation API handles general content safety, most applications need custom filters for domain-specific rules, brand protection, and business logic. A custom filter engine complements the Moderation API by catching application-specific violations.
Custom filters typically include regex patterns for detecting specific phrases, keyword lists for blocking prohibited terms, and context-aware rules that consider conversation history. The key is balancing protection with user experience—overly aggressive filtering frustrates users while insufficient filtering risks safety incidents.
Here's a production filter engine with multiple filter types and priority handling:
import re
import json
from typing import List, Dict, Optional, Set, Tuple
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
class FilterType(Enum):
"""Types of content filters"""
KEYWORD = "keyword"
REGEX = "regex"
PATTERN = "pattern"
CONTEXT = "context"
class FilterAction(Enum):
"""Actions to take on filter match"""
BLOCK = "block"
WARN = "warn"
LOG = "log"
REVIEW = "review"
@dataclass
class FilterRule:
"""Individual filter rule"""
id: str
filter_type: FilterType
pattern: str
action: FilterAction
priority: int
description: str
enabled: bool = True
@dataclass
class FilterMatch:
"""Result of filter matching"""
matched: bool
rule_id: str
filter_type: FilterType
action: FilterAction
matched_text: str
context: str
priority: int
class CustomFilterEngine:
"""
Advanced custom filtering system for application-specific rules
Features:
- Multiple filter types (keywords, regex, patterns)
- Priority-based rule processing
- Context-aware filtering
- Dynamic rule loading
- Performance optimization with compiled patterns
"""
def __init__(self, rules_file: Optional[str] = None):
self.rules: List[FilterRule] = []
self.compiled_patterns: Dict[str, re.Pattern] = {}
self.keyword_sets: Dict[str, Set[str]] = {}
if rules_file:
self.load_rules(rules_file)
def load_rules(self, rules_file: str):
"""Load filter rules from JSON configuration"""
with open(rules_file, 'r') as f:
rules_data = json.load(f)
for rule_data in rules_data:
rule = FilterRule(
id=rule_data['id'],
filter_type=FilterType(rule_data['type']),
pattern=rule_data['pattern'],
action=FilterAction(rule_data['action']),
priority=rule_data.get('priority', 5),
description=rule_data.get('description', ''),
enabled=rule_data.get('enabled', True)
)
self.add_rule(rule)
def add_rule(self, rule: FilterRule):
"""Add a filter rule and compile patterns"""
self.rules.append(rule)
# Pre-compile regex patterns for performance
if rule.filter_type == FilterType.REGEX:
self.compiled_patterns[rule.id] = re.compile(
rule.pattern,
re.IGNORECASE | re.MULTILINE
)
# Build keyword sets for fast lookup
elif rule.filter_type == FilterType.KEYWORD:
keywords = rule.pattern.lower().split('|')
self.keyword_sets[rule.id] = set(keywords)
# Sort rules by priority (higher = processed first)
self.rules.sort(key=lambda r: r.priority, reverse=True)
def filter(
self,
text: str,
context: Optional[List[str]] = None
) -> List[FilterMatch]:
"""
Apply all filters to text content
Args:
text: Content to filter
context: Previous messages for context-aware filtering
Returns:
List of filter matches
"""
matches = []
text_lower = text.lower()
for rule in self.rules:
if not rule.enabled:
continue
match = None
if rule.filter_type == FilterType.KEYWORD:
match = self._check_keywords(rule, text_lower)
elif rule.filter_type == FilterType.REGEX:
match = self._check_regex(rule, text)
elif rule.filter_type == FilterType.PATTERN:
match = self._check_pattern(rule, text_lower)
elif rule.filter_type == FilterType.CONTEXT:
if context:
match = self._check_context(rule, text, context)
if match:
matches.append(match)
# Stop processing if high-priority block action
if rule.action == FilterAction.BLOCK and rule.priority >= 8:
break
return matches
def _check_keywords(self, rule: FilterRule, text: str) -> Optional[FilterMatch]:
"""Check keyword-based filters"""
keywords = self.keyword_sets.get(rule.id, set())
# Tokenize text for word boundary matching
words = set(re.findall(r'\b\w+\b', text))
matched_keywords = words & keywords
if matched_keywords:
return FilterMatch(
matched=True,
rule_id=rule.id,
filter_type=rule.filter_type,
action=rule.action,
matched_text=', '.join(matched_keywords),
context=text[:100],
priority=rule.priority
)
return None
def _check_regex(self, rule: FilterRule, text: str) -> Optional[FilterMatch]:
"""Check regex-based filters"""
pattern = self.compiled_patterns.get(rule.id)
if not pattern:
return None
match = pattern.search(text)
if match:
return FilterMatch(
matched=True,
rule_id=rule.id,
filter_type=rule.filter_type,
action=rule.action,
matched_text=match.group(0),
context=text[max(0, match.start()-50):match.end()+50],
priority=rule.priority
)
return None
def _check_pattern(self, rule: FilterRule, text: str) -> Optional[FilterMatch]:
"""Check pattern-based filters (wildcards, etc.)"""
# Convert wildcard pattern to regex
pattern_regex = rule.pattern.replace('*', '.*').replace('?', '.{1}')
pattern = re.compile(pattern_regex, re.IGNORECASE)
match = pattern.search(text)
if match:
return FilterMatch(
matched=True,
rule_id=rule.id,
filter_type=rule.filter_type,
action=rule.action,
matched_text=match.group(0),
context=text[:100],
priority=rule.priority
)
return None
def _check_context(
self,
rule: FilterRule,
text: str,
context: List[str]
) -> Optional[FilterMatch]:
"""Check context-aware filters across conversation history"""
# Combine current message with context
full_context = ' '.join(context[-5:] + [text])
# Apply pattern to full context
pattern = re.compile(rule.pattern, re.IGNORECASE)
match = pattern.search(full_context)
if match:
return FilterMatch(
matched=True,
rule_id=rule.id,
filter_type=rule.filter_type,
action=rule.action,
matched_text=match.group(0),
context=full_context[:200],
priority=rule.priority
)
return None
def should_block(self, matches: List[FilterMatch]) -> bool:
"""Determine if content should be blocked based on matches"""
for match in matches:
if match.action == FilterAction.BLOCK:
return True
return False
def get_highest_priority_action(
self,
matches: List[FilterMatch]
) -> Optional[FilterAction]:
"""Get the highest priority action from matches"""
if not matches:
return None
matches_sorted = sorted(matches, key=lambda m: m.priority, reverse=True)
return matches_sorted[0].action
Implement custom filters for your specific needs: competitor mentions, sensitive company information, inappropriate requests for your industry, or terms that violate your terms of service. The filter engine supports hot-reloading of rules, allowing you to update filters without redeploying your application.
For comprehensive content policy enforcement strategies, see our guide on Content Policy Enforcement for ChatGPT Apps.
User Reporting System
Even the best automated moderation systems miss edge cases. A user reporting mechanism provides a safety valve, allowing your community to flag inappropriate content that slipped through automated filters. User reports also provide valuable data for improving your moderation rules.
An effective reporting system requires clear report categories, easy submission workflows, and transparent feedback to reporters. Users are more likely to report problems when they see their reports lead to action. Implement a ticketing system that tracks report status and provides updates.
Here's a production user report handler with workflow management:
import uuid
from datetime import datetime
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, asdict
from enum import Enum
class ReportCategory(Enum):
"""Categories for user reports"""
HARASSMENT = "harassment"
HATE_SPEECH = "hate_speech"
SPAM = "spam"
INAPPROPRIATE_CONTENT = "inappropriate_content"
MISINFORMATION = "misinformation"
PRIVACY_VIOLATION = "privacy_violation"
OTHER = "other"
class ReportStatus(Enum):
"""Report processing status"""
PENDING = "pending"
UNDER_REVIEW = "under_review"
RESOLVED = "resolved"
DISMISSED = "dismissed"
ESCALATED = "escalated"
class ReportPriority(Enum):
"""Report priority levels"""
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class UserReport:
"""User-submitted content report"""
report_id: str
reporter_id: str
reported_content_id: str
category: ReportCategory
description: str
status: ReportStatus
priority: ReportPriority
created_at: datetime
updated_at: datetime
assigned_to: Optional[str] = None
resolution_notes: Optional[str] = None
resolved_at: Optional[datetime] = None
false_positive: bool = False
class UserReportHandler:
"""
Production user reporting system with workflow management
Features:
- Multi-category reporting
- Priority-based queue management
- Status tracking and updates
- False positive detection
- Notification system
- Analytics and reporting
"""
def __init__(self, database_client):
self.db = database_client
self.reports_collection = "user_reports"
self.false_positive_threshold = 0.3
def submit_report(
self,
reporter_id: str,
reported_content_id: str,
category: ReportCategory,
description: str,
additional_context: Optional[Dict[str, Any]] = None
) -> UserReport:
"""
Submit a new user report
Args:
reporter_id: ID of user submitting report
reported_content_id: ID of flagged content
category: Report category
description: User's description of issue
additional_context: Optional metadata
Returns:
Created UserReport object
"""
# Calculate initial priority based on category
priority = self._calculate_priority(category, description)
report = UserReport(
report_id=str(uuid.uuid4()),
reporter_id=reporter_id,
reported_content_id=reported_content_id,
category=category,
description=description,
status=ReportStatus.PENDING,
priority=priority,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
# Store report in database
self.db.insert(self.reports_collection, asdict(report))
# Check if this content has multiple reports (increases priority)
self._check_repeat_reports(reported_content_id)
# Send notification to moderation team
self._notify_moderators(report)
return report
def _calculate_priority(
self,
category: ReportCategory,
description: str
) -> ReportPriority:
"""Calculate report priority based on category and content"""
high_priority_categories = [
ReportCategory.HARASSMENT,
ReportCategory.HATE_SPEECH,
ReportCategory.PRIVACY_VIOLATION
]
if category in high_priority_categories:
return ReportPriority.HIGH
# Check for urgent keywords in description
urgent_keywords = ['threat', 'danger', 'minor', 'illegal', 'emergency']
if any(keyword in description.lower() for keyword in urgent_keywords):
return ReportPriority.CRITICAL
if category == ReportCategory.SPAM:
return ReportPriority.LOW
return ReportPriority.MEDIUM
def _check_repeat_reports(self, content_id: str):
"""Check for multiple reports of same content"""
reports = self.db.query(
self.reports_collection,
{"reported_content_id": content_id, "status": ReportStatus.PENDING.value}
)
if len(reports) >= 3:
# Escalate all reports for this content
for report in reports:
self.update_status(
report['report_id'],
ReportStatus.ESCALATED,
"Multiple reports received"
)
def _notify_moderators(self, report: UserReport):
"""Send notification to moderation team"""
# Implementation depends on your notification system
# (email, Slack, internal dashboard, etc.)
pass
def update_status(
self,
report_id: str,
new_status: ReportStatus,
notes: Optional[str] = None,
assigned_to: Optional[str] = None
) -> bool:
"""Update report status and tracking info"""
update_data = {
"status": new_status.value,
"updated_at": datetime.utcnow()
}
if notes:
update_data["resolution_notes"] = notes
if assigned_to:
update_data["assigned_to"] = assigned_to
if new_status in [ReportStatus.RESOLVED, ReportStatus.DISMISSED]:
update_data["resolved_at"] = datetime.utcnow()
return self.db.update(
self.reports_collection,
{"report_id": report_id},
update_data
)
def get_pending_reports(
self,
priority: Optional[ReportPriority] = None,
limit: int = 50
) -> List[Dict]:
"""Retrieve pending reports for review"""
query = {"status": ReportStatus.PENDING.value}
if priority:
query["priority"] = priority.value
reports = self.db.query(
self.reports_collection,
query,
sort=[("priority", -1), ("created_at", 1)],
limit=limit
)
return reports
def mark_false_positive(self, report_id: str, reason: str):
"""Mark report as false positive for learning"""
update_data = {
"false_positive": True,
"status": ReportStatus.DISMISSED.value,
"resolution_notes": f"False positive: {reason}",
"resolved_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
self.db.update(
self.reports_collection,
{"report_id": report_id},
update_data
)
def get_reporter_statistics(self, reporter_id: str) -> Dict[str, Any]:
"""Get statistics for a specific reporter"""
reports = self.db.query(
self.reports_collection,
{"reporter_id": reporter_id}
)
total = len(reports)
false_positives = sum(1 for r in reports if r.get('false_positive', False))
resolved = sum(1 for r in reports if r.get('status') == ReportStatus.RESOLVED.value)
return {
"total_reports": total,
"false_positive_rate": false_positives / total if total > 0 else 0,
"resolved_count": resolved,
"accuracy_rate": (resolved / total) if total > 0 else 0
}
Encourage reporting by making it accessible (single click from any message) and anonymous (don't reveal reporter identity to reported users). Track reporter accuracy to identify malicious reporting while rewarding high-quality reports.
Explore comprehensive user safety strategies in our User Safety in ChatGPT Apps guide.
Automated Escalation Policies
Automated escalation ensures serious violations receive immediate attention while preventing your moderation team from drowning in low-priority reports. Escalation policies route content based on severity, trigger automatic actions for critical violations, and queue borderline cases for human review.
Effective escalation balances speed with accuracy. Critical violations (illegal content, imminent harm threats) trigger immediate automated responses plus human notification. Medium-severity issues enter review queues. Low-severity violations may be automatically resolved with warnings or content removal.
Here's a production escalation manager with automated actions:
from typing import List, Dict, Optional, Callable, Any
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
class SeverityLevel(Enum):
"""Violation severity levels"""
INFO = 1
LOW = 2
MEDIUM = 3
HIGH = 4
CRITICAL = 5
class AutomatedAction(Enum):
"""Automatic actions for violations"""
LOG_ONLY = "log"
WARN_USER = "warn"
DELETE_CONTENT = "delete"
SUSPEND_USER = "suspend"
BAN_USER = "ban"
NOTIFY_ADMIN = "notify_admin"
CONTACT_AUTHORITIES = "contact_authorities"
@dataclass
class EscalationRule:
"""Rule defining escalation behavior"""
severity: SeverityLevel
actions: List[AutomatedAction]
requires_review: bool
review_deadline_hours: int
notify_channels: List[str]
auto_resolve: bool = False
@dataclass
class ViolationEvent:
"""Record of a content violation"""
event_id: str
content_id: str
user_id: str
violation_type: str
severity: SeverityLevel
evidence: Dict[str, Any]
timestamp: datetime
actions_taken: List[AutomatedAction]
review_required: bool
resolved: bool = False
class EscalationManager:
"""
Production escalation system for content violations
Features:
- Severity-based automatic actions
- Multi-channel notifications
- Human review queuing
- Action audit trail
- Deadline tracking
- Pattern detection
"""
def __init__(self, database_client, notification_service):
self.db = database_client
self.notifier = notification_service
self.violations_collection = "violation_events"
# Define escalation rules
self.escalation_rules = {
SeverityLevel.INFO: EscalationRule(
severity=SeverityLevel.INFO,
actions=[AutomatedAction.LOG_ONLY],
requires_review=False,
review_deadline_hours=0,
notify_channels=[],
auto_resolve=True
),
SeverityLevel.LOW: EscalationRule(
severity=SeverityLevel.LOW,
actions=[AutomatedAction.LOG_ONLY, AutomatedAction.WARN_USER],
requires_review=False,
review_deadline_hours=0,
notify_channels=["moderation_log"],
auto_resolve=True
),
SeverityLevel.MEDIUM: EscalationRule(
severity=SeverityLevel.MEDIUM,
actions=[AutomatedAction.DELETE_CONTENT, AutomatedAction.WARN_USER],
requires_review=True,
review_deadline_hours=24,
notify_channels=["moderation_queue"],
auto_resolve=False
),
SeverityLevel.HIGH: EscalationRule(
severity=SeverityLevel.HIGH,
actions=[
AutomatedAction.DELETE_CONTENT,
AutomatedAction.SUSPEND_USER,
AutomatedAction.NOTIFY_ADMIN
],
requires_review=True,
review_deadline_hours=4,
notify_channels=["moderation_urgent", "admin_alerts"],
auto_resolve=False
),
SeverityLevel.CRITICAL: EscalationRule(
severity=SeverityLevel.CRITICAL,
actions=[
AutomatedAction.DELETE_CONTENT,
AutomatedAction.BAN_USER,
AutomatedAction.NOTIFY_ADMIN,
AutomatedAction.CONTACT_AUTHORITIES
],
requires_review=True,
review_deadline_hours=1,
notify_channels=["critical_alerts", "legal_team", "admin_alerts"],
auto_resolve=False
)
}
# Action handlers
self.action_handlers: Dict[AutomatedAction, Callable] = {
AutomatedAction.LOG_ONLY: self._log_violation,
AutomatedAction.WARN_USER: self._warn_user,
AutomatedAction.DELETE_CONTENT: self._delete_content,
AutomatedAction.SUSPEND_USER: self._suspend_user,
AutomatedAction.BAN_USER: self._ban_user,
AutomatedAction.NOTIFY_ADMIN: self._notify_admin,
AutomatedAction.CONTACT_AUTHORITIES: self._contact_authorities
}
def process_violation(
self,
content_id: str,
user_id: str,
violation_type: str,
severity: SeverityLevel,
evidence: Dict[str, Any]
) -> ViolationEvent:
"""
Process a content violation with automatic escalation
Args:
content_id: ID of violating content
user_id: ID of user who created content
violation_type: Type of violation detected
severity: Severity level
evidence: Supporting evidence (scores, matches, etc.)
Returns:
ViolationEvent with actions taken
"""
import uuid
# Create violation event
event = ViolationEvent(
event_id=str(uuid.uuid4()),
content_id=content_id,
user_id=user_id,
violation_type=violation_type,
severity=severity,
evidence=evidence,
timestamp=datetime.utcnow(),
actions_taken=[],
review_required=False
)
# Get escalation rule for severity level
rule = self.escalation_rules.get(severity)
if not rule:
rule = self.escalation_rules[SeverityLevel.MEDIUM]
# Execute automated actions
for action in rule.actions:
handler = self.action_handlers.get(action)
if handler:
try:
handler(event)
event.actions_taken.append(action)
except Exception as e:
print(f"Error executing action {action}: {e}")
# Queue for human review if required
if rule.requires_review:
event.review_required = True
deadline = datetime.utcnow() + timedelta(hours=rule.review_deadline_hours)
self._queue_for_review(event, deadline)
# Send notifications
for channel in rule.notify_channels:
self._send_notification(channel, event)
# Auto-resolve if policy allows
if rule.auto_resolve:
event.resolved = True
# Store violation event
self.db.insert(self.violations_collection, self._event_to_dict(event))
# Check for patterns (repeat offender, coordinated attack, etc.)
self._check_violation_patterns(user_id, violation_type)
return event
def _log_violation(self, event: ViolationEvent):
"""Log violation to audit trail"""
print(f"[VIOLATION] {event.violation_type} - User: {event.user_id}, Severity: {event.severity.name}")
def _warn_user(self, event: ViolationEvent):
"""Send warning to user"""
self.notifier.send_user_warning(
event.user_id,
f"Your content violated our policies: {event.violation_type}"
)
def _delete_content(self, event: ViolationEvent):
"""Delete violating content"""
self.db.update(
"content",
{"content_id": event.content_id},
{"deleted": True, "deleted_at": datetime.utcnow(), "deletion_reason": event.violation_type}
)
def _suspend_user(self, event: ViolationEvent):
"""Temporarily suspend user account"""
suspension_end = datetime.utcnow() + timedelta(days=7)
self.db.update(
"users",
{"user_id": event.user_id},
{
"suspended": True,
"suspension_end": suspension_end,
"suspension_reason": event.violation_type
}
)
self.notifier.send_user_notification(
event.user_id,
f"Your account has been suspended until {suspension_end} due to: {event.violation_type}"
)
def _ban_user(self, event: ViolationEvent):
"""Permanently ban user account"""
self.db.update(
"users",
{"user_id": event.user_id},
{
"banned": True,
"banned_at": datetime.utcnow(),
"ban_reason": event.violation_type
}
)
def _notify_admin(self, event: ViolationEvent):
"""Notify administrators of serious violation"""
self.notifier.send_admin_alert(
f"Critical violation: {event.violation_type}",
self._event_to_dict(event)
)
def _contact_authorities(self, event: ViolationEvent):
"""Contact law enforcement for illegal content"""
# Implementation depends on jurisdiction and legal requirements
# This should trigger secure notification to designated legal contact
self.notifier.send_legal_alert(
"Critical violation requiring authority notification",
self._event_to_dict(event)
)
def _queue_for_review(self, event: ViolationEvent, deadline: datetime):
"""Add violation to human review queue"""
self.db.insert("review_queue", {
"event_id": event.event_id,
"deadline": deadline,
"priority": event.severity.value,
"created_at": datetime.utcnow()
})
def _send_notification(self, channel: str, event: ViolationEvent):
"""Send notification to specified channel"""
self.notifier.send_channel_notification(
channel,
f"[{event.severity.name}] {event.violation_type}",
self._event_to_dict(event)
)
def _check_violation_patterns(self, user_id: str, violation_type: str):
"""Check for repeat violations or patterns"""
# Get recent violations for this user
recent_window = datetime.utcnow() - timedelta(days=30)
recent_violations = self.db.query(
self.violations_collection,
{
"user_id": user_id,
"timestamp": {"$gte": recent_window}
}
)
# Escalate if repeat offender
if len(recent_violations) >= 3:
self.process_violation(
content_id="pattern_detected",
user_id=user_id,
violation_type="repeat_offender",
severity=SeverityLevel.HIGH,
evidence={"previous_violations": len(recent_violations)}
)
def _event_to_dict(self, event: ViolationEvent) -> Dict[str, Any]:
"""Convert event to dictionary for storage"""
return {
"event_id": event.event_id,
"content_id": event.content_id,
"user_id": event.user_id,
"violation_type": event.violation_type,
"severity": event.severity.value,
"evidence": event.evidence,
"timestamp": event.timestamp,
"actions_taken": [a.value for a in event.actions_taken],
"review_required": event.review_required,
"resolved": event.resolved
}
def get_pending_reviews(self, limit: int = 50) -> List[Dict]:
"""Get violations awaiting human review"""
return self.db.query(
"review_queue",
{"resolved": False},
sort=[("priority", -1), ("deadline", 1)],
limit=limit
)
Escalation policies should evolve based on your moderation team's capacity and your application's risk profile. High-traffic applications may need stricter automatic actions, while low-traffic applications can rely more on manual review.
For abuse prevention strategies beyond content moderation, see our ChatGPT Abuse Prevention Strategies guide.
Compliance and Regulatory Requirements
Content moderation isn't just about user experience—it's often a legal requirement. Regulations like GDPR (Europe), COPPA (United States), and industry-specific standards mandate specific moderation practices, data handling procedures, and user protection measures.
GDPR requires transparency about data processing, user consent mechanisms, and the right to access or delete moderation records. COPPA prohibits collecting data from children under 13 without verifiable parental consent and requires stricter content filtering for child-directed services. Healthcare apps must comply with HIPAA, financial apps with regulations like GLBA, and educational apps with FERPA.
Here's a compliance checker for common regulatory requirements:
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class Regulation(Enum):
"""Supported regulatory frameworks"""
GDPR = "gdpr"
COPPA = "coppa"
HIPAA = "hipaa"
GLBA = "glba"
FERPA = "ferpa"
CCPA = "ccpa"
@dataclass
class ComplianceCheck:
"""Result of compliance validation"""
regulation: Regulation
compliant: bool
violations: List[str]
warnings: List[str]
timestamp: datetime
class ComplianceChecker:
"""
Regulatory compliance validator for content moderation
Features:
- Multi-regulation support (GDPR, COPPA, HIPAA, etc.)
- Age verification checking
- Data handling validation
- Consent verification
- Audit trail generation
"""
def __init__(self, database_client):
self.db = database_client
def check_gdpr_compliance(
self,
user_id: str,
content_data: Dict[str, Any],
processing_purpose: str
) -> ComplianceCheck:
"""Validate GDPR compliance for content processing"""
violations = []
warnings = []
# Check for valid consent
user_consent = self.db.get("user_consents", {"user_id": user_id})
if not user_consent or not user_consent.get("gdpr_consent"):
violations.append("No valid GDPR consent found")
# Verify lawful basis for processing
lawful_bases = ["consent", "contract", "legal_obligation", "vital_interests", "public_task", "legitimate_interests"]
if processing_purpose not in lawful_bases:
warnings.append(f"Processing purpose '{processing_purpose}' may not constitute lawful basis")
# Check for data minimization
personal_data_fields = ["email", "phone", "address", "ip_address", "location"]
collected_fields = [f for f in personal_data_fields if f in content_data]
if len(collected_fields) > 3:
warnings.append("Consider data minimization - collecting extensive personal data")
# Verify retention policy
if not self._has_retention_policy(user_id):
violations.append("No data retention policy defined")
# Check for user rights support
required_rights = ["access", "rectification", "erasure", "portability", "objection"]
if not self._supports_user_rights(required_rights):
violations.append("User rights mechanisms not fully implemented")
return ComplianceCheck(
regulation=Regulation.GDPR,
compliant=len(violations) == 0,
violations=violations,
warnings=warnings,
timestamp=datetime.utcnow()
)
def check_coppa_compliance(
self,
user_id: str,
user_age: Optional[int],
parental_consent: bool
) -> ComplianceCheck:
"""Validate COPPA compliance for services accessible to children"""
violations = []
warnings = []
# Age verification
if user_age is None:
violations.append("Age not verified - required for COPPA compliance")
elif user_age < 13:
# Child under 13 - strict requirements
if not parental_consent:
violations.append("Parental consent required for users under 13")
# Check for prohibited data collection
user_data = self.db.get("users", {"user_id": user_id})
prohibited_fields = ["precise_location", "social_security", "photos"]
collected_prohibited = [f for f in prohibited_fields if f in user_data]
if collected_prohibited:
violations.append(f"Prohibited data collected from minor: {', '.join(collected_prohibited)}")
# Verify enhanced moderation for child-directed content
if not self._has_enhanced_moderation():
warnings.append("Consider enhanced content moderation for child-directed service")
# Check privacy policy
if not self._has_child_privacy_policy():
violations.append("Child-specific privacy policy required")
return ComplianceCheck(
regulation=Regulation.COPPA,
compliant=len(violations) == 0,
violations=violations,
warnings=warnings,
timestamp=datetime.utcnow()
)
def check_hipaa_compliance(
self,
content_data: Dict[str, Any]
) -> ComplianceCheck:
"""Validate HIPAA compliance for healthcare-related content"""
violations = []
warnings = []
# Check for PHI (Protected Health Information)
phi_indicators = [
"diagnosis", "treatment", "medication", "medical_record",
"health_condition", "prescription", "lab_results"
]
contains_phi = any(indicator in str(content_data).lower() for indicator in phi_indicators)
if contains_phi:
# Verify encryption
if not content_data.get("encrypted"):
violations.append("PHI must be encrypted at rest and in transit")
# Check access controls
if not self._has_role_based_access():
violations.append("Role-based access controls required for PHI")
# Verify audit logging
if not self._has_audit_logging():
violations.append("Comprehensive audit logging required for PHI access")
# Check for Business Associate Agreement
if not self._has_baa():
warnings.append("Ensure Business Associate Agreement (BAA) is in place")
return ComplianceCheck(
regulation=Regulation.HIPAA,
compliant=len(violations) == 0,
violations=violations,
warnings=warnings,
timestamp=datetime.utcnow()
)
def generate_compliance_report(
self,
regulations: List[Regulation],
user_id: str,
content_data: Dict[str, Any]
) -> Dict[Regulation, ComplianceCheck]:
"""Generate comprehensive compliance report across regulations"""
report = {}
for regulation in regulations:
if regulation == Regulation.GDPR:
check = self.check_gdpr_compliance(user_id, content_data, "legitimate_interests")
elif regulation == Regulation.COPPA:
user = self.db.get("users", {"user_id": user_id})
check = self.check_coppa_compliance(
user_id,
user.get("age"),
user.get("parental_consent", False)
)
elif regulation == Regulation.HIPAA:
check = self.check_hipaa_compliance(content_data)
else:
continue
report[regulation] = check
return report
def _has_retention_policy(self, user_id: str) -> bool:
"""Check if data retention policy is defined"""
# Implementation depends on your data architecture
return True
def _supports_user_rights(self, rights: List[str]) -> bool:
"""Verify user rights mechanisms are implemented"""
# Check if your system supports required GDPR rights
return True
def _has_enhanced_moderation(self) -> bool:
"""Check if enhanced moderation is enabled"""
return True
def _has_child_privacy_policy(self) -> bool:
"""Verify child-specific privacy policy exists"""
return True
def _has_role_based_access(self) -> bool:
"""Check for role-based access controls"""
return True
def _has_audit_logging(self) -> bool:
"""Verify comprehensive audit logging"""
return True
def _has_baa(self) -> bool:
"""Check for Business Associate Agreement"""
return True
Consult legal counsel to ensure your moderation practices meet all applicable regulations in your target markets. Compliance requirements vary by jurisdiction, industry, and user demographics.
For comprehensive GDPR compliance guidance, see our GDPR Compliance for ChatGPT Apps guide.
Conclusion
Implementing comprehensive content moderation protects your users, safeguards your reputation, and ensures regulatory compliance. The multi-layered approach presented in this guide—combining OpenAI's Moderation API, custom filters, user reporting, automated escalation, and compliance checking—provides production-ready protection for ChatGPT applications.
Start with the OpenAI Moderation API for general content safety, then layer custom filters for domain-specific rules. Add user reporting to catch edge cases and gather community feedback. Implement automated escalation to ensure serious violations receive immediate attention. Finally, validate compliance with applicable regulations based on your industry and target markets.
Content moderation is not a one-time implementation—it requires ongoing refinement as your application evolves, user behavior changes, and new threats emerge. Monitor moderation metrics, analyze false positive rates, and continuously improve your filters based on real-world usage.
Ready to build secure ChatGPT apps with world-class content moderation? MakeAIHQ.com provides automated tools for creating ChatGPT apps with built-in content safety features, user reporting systems, and compliance frameworks. Our platform handles the complexity of content moderation so you can focus on building great user experiences.
Start your free trial and deploy your first moderated ChatGPT app in under 48 hours—no coding required.
Frequently Asked Questions
Q: How accurate is the OpenAI Moderation API? A: The OpenAI Moderation API achieves high accuracy for general content safety, but you should customize thresholds based on your risk tolerance and supplement it with custom filters for domain-specific violations.
Q: Should I rely solely on automated moderation? A: No. Automated systems should be complemented with human review for edge cases, appeals, and continuous improvement. Use automation for initial filtering and escalation, but maintain human oversight.
Q: How do I handle false positives? A: Implement an appeals process, track false positive rates by filter type, and continuously refine your thresholds. Users should be able to request human review of automated moderation decisions.
Q: What's the best approach for COPPA compliance? A: Implement age verification at signup, obtain verifiable parental consent for users under 13, limit data collection from minors, and use enhanced content filtering for child-directed services.
Q: How often should I update moderation rules? A: Review moderation metrics weekly, update filters based on new violation patterns monthly, and conduct comprehensive policy reviews quarterly. Critical violations should trigger immediate rule updates.
Related Resources:
- ChatGPT App Security Hardening Guide - Comprehensive security strategies
- User Safety in ChatGPT Apps - User protection best practices
- GDPR Compliance for ChatGPT Apps - European data protection
- Content Policy Enforcement for ChatGPT - Policy implementation strategies
External References: