Real-Time Sync Patterns for ChatGPT Apps: WebSockets vs SSE Implementation Guide
Real-time synchronization transforms ChatGPT applications from static request-response systems into dynamic, collaborative experiences. Whether you're building a multi-user ChatGPT interface, collaborative document editor, or live analytics dashboard, choosing the right real-time sync pattern determines your app's performance, scalability, and user experience.
This guide provides production-ready implementations of WebSocket and Server-Sent Events (SSE) patterns specifically optimized for ChatGPT applications. You'll learn operational transformation algorithms, conflict resolution strategies, and horizontal scaling patterns used by enterprise real-time systems handling millions of concurrent connections.
Modern ChatGPT apps demand real-time capabilities: instant message delivery, collaborative prompt editing, live model response streaming, and synchronized widget state across multiple clients. The architectural decisions you make today—WebSocket vs SSE, operational transformation vs CRDT, in-memory vs Redis pub/sub—will determine whether your application can scale from 100 to 100,000 concurrent users without rewriting core infrastructure.
By the end of this guide, you'll have production-tested code for WebSocket servers with heartbeat mechanisms, SSE implementations with automatic reconnection, operational transformation for conflict-free synchronization, and Redis-based pub/sub for multi-server deployments.
Protocol Comparison: WebSockets vs SSE vs Long Polling
WebSocket Protocol
WebSocket provides bidirectional, full-duplex communication over a single TCP connection. After the initial HTTP handshake, the connection upgrades to the WebSocket protocol, enabling both client and server to send messages independently without HTTP overhead.
Best for: Real-time collaborative editing, multiplayer ChatGPT interactions, live cursor positions, bidirectional widget state synchronization.
Advantages:
- Low latency (no HTTP overhead after handshake)
- True bidirectional communication
- Efficient for high-frequency updates (100+ messages/second)
Disadvantages:
- Requires WebSocket-compatible load balancers with sticky sessions
- More complex error handling and reconnection logic
- Not HTTP/2 multiplexing compatible
Server-Sent Events (SSE)
SSE provides unidirectional server-to-client streaming over standard HTTP. The server maintains an open connection and pushes events as text/event-stream responses. Clients automatically reconnect with Last-Event-ID for seamless recovery.
Best for: ChatGPT response streaming, live analytics updates, notification systems, one-way data push.
Advantages:
- Built-in automatic reconnection
- Works through HTTP/2 (multiplexing multiple streams)
- Simpler implementation than WebSocket
- Firewall-friendly (standard HTTP)
Disadvantages:
- Unidirectional only (client must use separate HTTP requests)
- Browser connection limits (6 per domain in Chrome)
- No binary data support (text-only)
Long Polling
Long polling holds HTTP requests open until new data is available, then immediately reconnects. This creates pseudo-real-time updates using standard HTTP.
Best for: Fallback mechanism when WebSocket/SSE unavailable, low-frequency updates, legacy browser support.
Advantages:
- Works everywhere (no special protocol support needed)
- Simple implementation
Disadvantages:
- High latency (HTTP overhead on every poll)
- Inefficient resource usage (many short-lived connections)
- Scalability challenges (connection churn)
Protocol Selection Matrix:
| Use Case | Recommended Protocol | Why |
|---|---|---|
| ChatGPT response streaming | SSE | Unidirectional, auto-reconnect, HTTP/2 compatible |
| Collaborative prompt editing | WebSocket | Bidirectional, low latency, frequent updates |
| Live analytics dashboard | SSE | Server-push only, simple implementation |
| Multi-user ChatGPT canvas | WebSocket | Real-time cursor sync, operational transformation |
| Notification system | SSE | One-way push, built-in reconnection |
For more context on architectural patterns, see our ChatGPT Applications Guide and Event-Driven Architecture for ChatGPT Apps.
Production WebSocket Implementation
WebSocket Server with Heartbeat Mechanism
This production-grade WebSocket server includes connection management, heartbeat monitoring, room-based broadcasting, and graceful shutdown:
// websocket-server.ts
import WebSocket, { WebSocketServer } from 'ws';
import { IncomingMessage } from 'http';
import { createServer } from 'http';
interface Client {
ws: WebSocket;
id: string;
userId: string;
rooms: Set<string>;
isAlive: boolean;
metadata: Record<string, any>;
}
interface Message {
type: 'join' | 'leave' | 'broadcast' | 'sync' | 'heartbeat';
room?: string;
userId?: string;
data?: any;
timestamp?: number;
}
export class ChatGPTWebSocketServer {
private wss: WebSocketServer;
private clients: Map<string, Client> = new Map();
private rooms: Map<string, Set<string>> = new Map();
private heartbeatInterval: NodeJS.Timeout;
private port: number;
constructor(port: number = 8080) {
this.port = port;
const server = createServer();
this.wss = new WebSocketServer({ server });
this.wss.on('connection', this.handleConnection.bind(this));
// Heartbeat mechanism: ping every 30 seconds
this.heartbeatInterval = setInterval(() => {
this.clients.forEach((client, clientId) => {
if (!client.isAlive) {
console.log(`Client ${clientId} failed heartbeat, terminating`);
this.terminateClient(clientId);
return;
}
client.isAlive = false;
client.ws.ping();
});
}, 30000);
server.listen(port, () => {
console.log(`WebSocket server listening on port ${port}`);
});
// Graceful shutdown
process.on('SIGTERM', () => this.shutdown());
process.on('SIGINT', () => this.shutdown());
}
private handleConnection(ws: WebSocket, request: IncomingMessage): void {
const clientId = this.generateClientId();
const url = new URL(request.url || '', `http://${request.headers.host}`);
const userId = url.searchParams.get('userId') || 'anonymous';
const client: Client = {
ws,
id: clientId,
userId,
rooms: new Set(),
isAlive: true,
metadata: {}
};
this.clients.set(clientId, client);
console.log(`Client connected: ${clientId} (userId: ${userId})`);
// Heartbeat response
ws.on('pong', () => {
const c = this.clients.get(clientId);
if (c) c.isAlive = true;
});
// Message handling
ws.on('message', (data: Buffer) => {
try {
const message: Message = JSON.parse(data.toString());
this.handleMessage(clientId, message);
} catch (error) {
console.error(`Failed to parse message from ${clientId}:`, error);
this.sendError(clientId, 'Invalid message format');
}
});
// Disconnection
ws.on('close', () => {
console.log(`Client disconnected: ${clientId}`);
this.handleDisconnection(clientId);
});
ws.on('error', (error) => {
console.error(`WebSocket error for client ${clientId}:`, error);
this.terminateClient(clientId);
});
// Send welcome message
this.send(clientId, {
type: 'connected',
clientId,
userId,
timestamp: Date.now()
});
}
private handleMessage(clientId: string, message: Message): void {
const client = this.clients.get(clientId);
if (!client) return;
switch (message.type) {
case 'join':
if (message.room) {
this.joinRoom(clientId, message.room);
}
break;
case 'leave':
if (message.room) {
this.leaveRoom(clientId, message.room);
}
break;
case 'broadcast':
if (message.room && message.data) {
this.broadcastToRoom(message.room, {
type: 'message',
userId: client.userId,
data: message.data,
timestamp: Date.now()
}, clientId);
}
break;
case 'sync':
// Handle operational transformation or CRDT sync
this.handleSync(clientId, message);
break;
case 'heartbeat':
client.isAlive = true;
break;
default:
console.warn(`Unknown message type: ${message.type}`);
}
}
private joinRoom(clientId: string, room: string): void {
const client = this.clients.get(clientId);
if (!client) return;
client.rooms.add(room);
if (!this.rooms.has(room)) {
this.rooms.set(room, new Set());
}
this.rooms.get(room)!.add(clientId);
console.log(`Client ${clientId} joined room ${room}`);
// Notify room members
this.broadcastToRoom(room, {
type: 'user_joined',
userId: client.userId,
room,
timestamp: Date.now()
});
}
private leaveRoom(clientId: string, room: string): void {
const client = this.clients.get(clientId);
if (!client) return;
client.rooms.delete(room);
this.rooms.get(room)?.delete(clientId);
if (this.rooms.get(room)?.size === 0) {
this.rooms.delete(room);
}
console.log(`Client ${clientId} left room ${room}`);
this.broadcastToRoom(room, {
type: 'user_left',
userId: client.userId,
room,
timestamp: Date.now()
});
}
private broadcastToRoom(room: string, message: any, excludeClientId?: string): void {
const roomClients = this.rooms.get(room);
if (!roomClients) return;
const payload = JSON.stringify(message);
roomClients.forEach(clientId => {
if (clientId !== excludeClientId) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(payload);
}
}
});
}
private send(clientId: string, message: any): void {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(message));
}
}
private sendError(clientId: string, error: string): void {
this.send(clientId, {
type: 'error',
error,
timestamp: Date.now()
});
}
private handleSync(clientId: string, message: Message): void {
// Implement operational transformation or CRDT sync
// See "Sync Strategies" section below
const client = this.clients.get(clientId);
if (!client || !message.room) return;
this.broadcastToRoom(message.room, {
type: 'sync_update',
userId: client.userId,
data: message.data,
timestamp: Date.now()
}, clientId);
}
private handleDisconnection(clientId: string): void {
const client = this.clients.get(clientId);
if (!client) return;
// Leave all rooms
client.rooms.forEach(room => {
this.leaveRoom(clientId, room);
});
this.clients.delete(clientId);
}
private terminateClient(clientId: string): void {
const client = this.clients.get(clientId);
if (!client) return;
client.ws.terminate();
this.handleDisconnection(clientId);
}
private generateClientId(): string {
return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private shutdown(): void {
console.log('Shutting down WebSocket server...');
clearInterval(this.heartbeatInterval);
// Notify all clients
this.clients.forEach(client => {
this.send(client.id, {
type: 'server_shutdown',
message: 'Server is shutting down',
timestamp: Date.now()
});
client.ws.close(1001, 'Server shutdown');
});
this.wss.close(() => {
console.log('WebSocket server closed');
process.exit(0);
});
}
public getStats(): any {
return {
totalClients: this.clients.size,
totalRooms: this.rooms.size,
roomDetails: Array.from(this.rooms.entries()).map(([room, clients]) => ({
room,
clientCount: clients.size
}))
};
}
}
// Usage
const wsServer = new ChatGPTWebSocketServer(8080);
WebSocket Client with Auto-Reconnection
Production-ready TypeScript client with exponential backoff reconnection:
// websocket-client.ts
export interface WebSocketClientConfig {
url: string;
userId: string;
reconnectInterval?: number;
maxReconnectInterval?: number;
reconnectDecay?: number;
maxReconnectAttempts?: number;
heartbeatInterval?: number;
}
export class ChatGPTWebSocketClient {
private ws: WebSocket | null = null;
private config: Required<WebSocketClientConfig>;
private reconnectAttempts: number = 0;
private reconnectTimeout: NodeJS.Timeout | null = null;
private heartbeatInterval: NodeJS.Timeout | null = null;
private isIntentionallyClosed: boolean = false;
private messageQueue: any[] = [];
private eventHandlers: Map<string, Set<Function>> = new Map();
constructor(config: WebSocketClientConfig) {
this.config = {
reconnectInterval: 1000,
maxReconnectInterval: 30000,
reconnectDecay: 1.5,
maxReconnectAttempts: 10,
heartbeatInterval: 25000,
...config
};
this.connect();
}
private connect(): void {
const url = `${this.config.url}?userId=${encodeURIComponent(this.config.userId)}`;
try {
this.ws = new WebSocket(url);
this.ws.onopen = this.handleOpen.bind(this);
this.ws.onmessage = this.handleMessage.bind(this);
this.ws.onclose = this.handleClose.bind(this);
this.ws.onerror = this.handleError.bind(this);
} catch (error) {
console.error('WebSocket connection failed:', error);
this.scheduleReconnect();
}
}
private handleOpen(): void {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
this.emit('connected', { userId: this.config.userId });
// Start heartbeat
this.startHeartbeat();
// Flush message queue
this.flushMessageQueue();
}
private handleMessage(event: MessageEvent): void {
try {
const message = JSON.parse(event.data);
// Emit specific event type
this.emit(message.type, message);
// Emit generic message event
this.emit('message', message);
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
}
}
private handleClose(event: CloseEvent): void {
console.log(`WebSocket closed: ${event.code} - ${event.reason}`);
this.stopHeartbeat();
this.emit('disconnected', { code: event.code, reason: event.reason });
if (!this.isIntentionallyClosed) {
this.scheduleReconnect();
}
}
private handleError(error: Event): void {
console.error('WebSocket error:', error);
this.emit('error', error);
}
private scheduleReconnect(): void {
if (this.reconnectAttempts >= this.config.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
this.emit('reconnect_failed', {
attempts: this.reconnectAttempts
});
return;
}
const delay = Math.min(
this.config.reconnectInterval * Math.pow(this.config.reconnectDecay, this.reconnectAttempts),
this.config.maxReconnectInterval
);
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1})`);
this.reconnectTimeout = setTimeout(() => {
this.reconnectAttempts++;
this.emit('reconnecting', {
attempt: this.reconnectAttempts,
delay
});
this.connect();
}, delay);
}
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(() => {
this.send({
type: 'heartbeat',
timestamp: Date.now()
});
}, this.config.heartbeatInterval);
}
private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
private flushMessageQueue(): void {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.send(message);
}
}
public send(message: any): void {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
// Queue message for later delivery
this.messageQueue.push(message);
}
}
public joinRoom(room: string): void {
this.send({
type: 'join',
room,
timestamp: Date.now()
});
}
public leaveRoom(room: string): void {
this.send({
type: 'leave',
room,
timestamp: Date.now()
});
}
public broadcast(room: string, data: any): void {
this.send({
type: 'broadcast',
room,
data,
timestamp: Date.now()
});
}
public on(event: string, handler: Function): void {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, new Set());
}
this.eventHandlers.get(event)!.add(handler);
}
public off(event: string, handler: Function): void {
this.eventHandlers.get(event)?.delete(handler);
}
private emit(event: string, data: any): void {
this.eventHandlers.get(event)?.forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`Event handler error for ${event}:`, error);
}
});
}
public close(): void {
this.isIntentionallyClosed = true;
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
this.stopHeartbeat();
if (this.ws) {
this.ws.close(1000, 'Client initiated close');
this.ws = null;
}
}
public getState(): string {
if (!this.ws) return 'CLOSED';
switch (this.ws.readyState) {
case WebSocket.CONNECTING: return 'CONNECTING';
case WebSocket.OPEN: return 'OPEN';
case WebSocket.CLOSING: return 'CLOSING';
case WebSocket.CLOSED: return 'CLOSED';
default: return 'UNKNOWN';
}
}
}
// Usage example
const wsClient = new ChatGPTWebSocketClient({
url: 'ws://localhost:8080',
userId: 'user123'
});
wsClient.on('connected', () => {
console.log('Connected to WebSocket server');
wsClient.joinRoom('chatgpt-app-session-1');
});
wsClient.on('message', (message) => {
console.log('Received message:', message);
});
wsClient.on('user_joined', (data) => {
console.log(`User ${data.userId} joined room ${data.room}`);
});
wsClient.broadcast('chatgpt-app-session-1', {
type: 'cursor_move',
x: 100,
y: 200
});
For more on securing WebSocket connections, see our guide on WebSocket Security Best Practices for ChatGPT Apps.
Server-Sent Events (SSE) Implementation
SSE Server with Express
Production SSE server with event streaming, client tracking, and automatic reconnection support:
// sse-server.ts
import express, { Request, Response } from 'express';
import { EventEmitter } from 'events';
interface SSEClient {
id: string;
userId: string;
response: Response;
rooms: Set<string>;
lastEventId: number;
}
export class ChatGPTSSEServer extends EventEmitter {
private app: express.Application;
private clients: Map<string, SSEClient> = new Map();
private rooms: Map<string, Set<string>> = new Map();
private eventCounter: number = 0;
private port: number;
constructor(port: number = 3000) {
super();
this.port = port;
this.app = express();
this.app.use(express.json());
// SSE endpoint
this.app.get('/events', this.handleSSEConnection.bind(this));
// Room management endpoints
this.app.post('/rooms/:room/join', this.handleJoinRoom.bind(this));
this.app.post('/rooms/:room/leave', this.handleLeaveRoom.bind(this));
this.app.post('/rooms/:room/broadcast', this.handleBroadcast.bind(this));
// Health check
this.app.get('/health', (req, res) => {
res.json({
status: 'ok',
clients: this.clients.size,
rooms: this.rooms.size
});
});
this.app.listen(port, () => {
console.log(`SSE server listening on port ${port}`);
});
}
private handleSSEConnection(req: Request, res: Response): void {
const userId = req.query.userId as string || 'anonymous';
const lastEventId = parseInt(req.headers['last-event-id'] as string || '0', 10);
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
const clientId = this.generateClientId();
const client: SSEClient = {
id: clientId,
userId,
response: res,
rooms: new Set(),
lastEventId
};
this.clients.set(clientId, client);
console.log(`SSE client connected: ${clientId} (userId: ${userId})`);
// Send welcome event
this.sendEvent(clientId, {
type: 'connected',
data: {
clientId,
userId,
timestamp: Date.now()
}
});
// Send missed events if reconnecting
if (lastEventId > 0) {
this.emit('replay_events', { clientId, lastEventId });
}
// Handle client disconnect
req.on('close', () => {
console.log(`SSE client disconnected: ${clientId}`);
this.handleDisconnection(clientId);
});
// Keep-alive ping every 30 seconds
const keepAliveInterval = setInterval(() => {
if (res.writable) {
res.write(': keep-alive\n\n');
} else {
clearInterval(keepAliveInterval);
}
}, 30000);
}
private handleJoinRoom(req: Request, res: Response): void {
const { room } = req.params;
const { clientId } = req.body;
const client = this.clients.get(clientId);
if (!client) {
res.status(404).json({ error: 'Client not found' });
return;
}
client.rooms.add(room);
if (!this.rooms.has(room)) {
this.rooms.set(room, new Set());
}
this.rooms.get(room)!.add(clientId);
console.log(`Client ${clientId} joined room ${room}`);
// Notify room members
this.broadcastToRoom(room, {
type: 'user_joined',
data: {
userId: client.userId,
room,
timestamp: Date.now()
}
});
res.json({ success: true });
}
private handleLeaveRoom(req: Request, res: Response): void {
const { room } = req.params;
const { clientId } = req.body;
const client = this.clients.get(clientId);
if (!client) {
res.status(404).json({ error: 'Client not found' });
return;
}
client.rooms.delete(room);
this.rooms.get(room)?.delete(clientId);
if (this.rooms.get(room)?.size === 0) {
this.rooms.delete(room);
}
console.log(`Client ${clientId} left room ${room}`);
this.broadcastToRoom(room, {
type: 'user_left',
data: {
userId: client.userId,
room,
timestamp: Date.now()
}
});
res.json({ success: true });
}
private handleBroadcast(req: Request, res: Response): void {
const { room } = req.params;
const { clientId, data } = req.body;
const client = this.clients.get(clientId);
if (!client) {
res.status(404).json({ error: 'Client not found' });
return;
}
this.broadcastToRoom(room, {
type: 'message',
data: {
userId: client.userId,
...data,
timestamp: Date.now()
}
}, clientId);
res.json({ success: true });
}
private sendEvent(clientId: string, event: { type: string; data: any; id?: number }): void {
const client = this.clients.get(clientId);
if (!client || !client.response.writable) return;
const eventId = event.id || ++this.eventCounter;
let message = `id: ${eventId}\n`;
message += `event: ${event.type}\n`;
message += `data: ${JSON.stringify(event.data)}\n\n`;
client.response.write(message);
client.lastEventId = eventId;
}
private broadcastToRoom(room: string, event: { type: string; data: any }, excludeClientId?: string): void {
const roomClients = this.rooms.get(room);
if (!roomClients) return;
const eventId = ++this.eventCounter;
roomClients.forEach(clientId => {
if (clientId !== excludeClientId) {
this.sendEvent(clientId, { ...event, id: eventId });
}
});
}
public broadcast(event: { type: string; data: any }): void {
const eventId = ++this.eventCounter;
this.clients.forEach(client => {
this.sendEvent(client.id, { ...event, id: eventId });
});
}
private handleDisconnection(clientId: string): void {
const client = this.clients.get(clientId);
if (!client) return;
// Leave all rooms
client.rooms.forEach(room => {
this.rooms.get(room)?.delete(clientId);
if (this.rooms.get(room)?.size === 0) {
this.rooms.delete(room);
}
});
this.clients.delete(clientId);
}
private generateClientId(): string {
return `sse_client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
public getStats(): any {
return {
totalClients: this.clients.size,
totalRooms: this.rooms.size,
eventCounter: this.eventCounter,
roomDetails: Array.from(this.rooms.entries()).map(([room, clients]) => ({
room,
clientCount: clients.size
}))
};
}
}
// Usage
const sseServer = new ChatGPTSSEServer(3000);
// Broadcast to all clients
sseServer.broadcast({
type: 'announcement',
data: { message: 'System maintenance in 10 minutes' }
});
SSE Client with Automatic Reconnection
TypeScript SSE client with event handling and reconnection:
// sse-client.ts
export interface SSEClientConfig {
url: string;
userId: string;
reconnectInterval?: number;
maxReconnectAttempts?: number;
}
export class ChatGPTSSEClient {
private eventSource: EventSource | null = null;
private config: Required<SSEClientConfig>;
private reconnectAttempts: number = 0;
private isIntentionallyClosed: boolean = false;
private eventHandlers: Map<string, Set<Function>> = new Map();
private clientId: string | null = null;
private lastEventId: number = 0;
constructor(config: SSEClientConfig) {
this.config = {
reconnectInterval: 3000,
maxReconnectAttempts: 10,
...config
};
this.connect();
}
private connect(): void {
const url = new URL(this.config.url);
url.searchParams.set('userId', this.config.userId);
try {
this.eventSource = new EventSource(url.toString());
this.eventSource.onopen = this.handleOpen.bind(this);
this.eventSource.onerror = this.handleError.bind(this);
// Generic message handler
this.eventSource.onmessage = (event: MessageEvent) => {
this.handleEvent('message', event);
};
// Custom event handlers
this.eventSource.addEventListener('connected', (event: MessageEvent) => {
this.handleEvent('connected', event);
});
this.eventSource.addEventListener('user_joined', (event: MessageEvent) => {
this.handleEvent('user_joined', event);
});
this.eventSource.addEventListener('user_left', (event: MessageEvent) => {
this.handleEvent('user_left', event);
});
this.eventSource.addEventListener('message', (event: MessageEvent) => {
this.handleEvent('room_message', event);
});
this.eventSource.addEventListener('announcement', (event: MessageEvent) => {
this.handleEvent('announcement', event);
});
} catch (error) {
console.error('SSE connection failed:', error);
this.scheduleReconnect();
}
}
private handleOpen(): void {
console.log('SSE connected');
this.reconnectAttempts = 0;
this.emit('open', {});
}
private handleEvent(type: string, event: MessageEvent): void {
try {
const data = JSON.parse(event.data);
// Track last event ID for reconnection
if (event.lastEventId) {
this.lastEventId = parseInt(event.lastEventId, 10);
}
// Extract client ID from connected event
if (type === 'connected' && data.clientId) {
this.clientId = data.clientId;
}
this.emit(type, data);
} catch (error) {
console.error(`Failed to parse SSE event (${type}):`, error);
}
}
private handleError(error: Event): void {
console.error('SSE error:', error);
if (this.eventSource?.readyState === EventSource.CLOSED) {
this.emit('error', error);
if (!this.isIntentionallyClosed) {
this.scheduleReconnect();
}
}
}
private scheduleReconnect(): void {
if (this.reconnectAttempts >= this.config.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
this.emit('reconnect_failed', {
attempts: this.reconnectAttempts
});
return;
}
const delay = this.config.reconnectInterval;
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1})`);
setTimeout(() => {
this.reconnectAttempts++;
this.emit('reconnecting', {
attempt: this.reconnectAttempts,
lastEventId: this.lastEventId
});
this.connect();
}, delay);
}
public async joinRoom(room: string): Promise<void> {
if (!this.clientId) {
throw new Error('Not connected (no client ID)');
}
const response = await fetch(`${this.config.url.replace('/events', '')}/rooms/${room}/join`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ clientId: this.clientId })
});
if (!response.ok) {
throw new Error(`Failed to join room: ${response.statusText}`);
}
}
public async leaveRoom(room: string): Promise<void> {
if (!this.clientId) {
throw new Error('Not connected (no client ID)');
}
const response = await fetch(`${this.config.url.replace('/events', '')}/rooms/${room}/leave`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ clientId: this.clientId })
});
if (!response.ok) {
throw new Error(`Failed to leave room: ${response.statusText}`);
}
}
public async broadcast(room: string, data: any): Promise<void> {
if (!this.clientId) {
throw new Error('Not connected (no client ID)');
}
const response = await fetch(`${this.config.url.replace('/events', '')}/rooms/${room}/broadcast`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ clientId: this.clientId, data })
});
if (!response.ok) {
throw new Error(`Failed to broadcast: ${response.statusText}`);
}
}
public on(event: string, handler: Function): void {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, new Set());
}
this.eventHandlers.get(event)!.add(handler);
}
public off(event: string, handler: Function): void {
this.eventHandlers.get(event)?.delete(handler);
}
private emit(event: string, data: any): void {
this.eventHandlers.get(event)?.forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`Event handler error for ${event}:`, error);
}
});
}
public close(): void {
this.isIntentionallyClosed = true;
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
public getClientId(): string | null {
return this.clientId;
}
}
// Usage example
const sseClient = new ChatGPTSSEClient({
url: 'http://localhost:3000/events',
userId: 'user123'
});
sseClient.on('connected', async (data) => {
console.log('Connected to SSE server:', data);
await sseClient.joinRoom('chatgpt-app-session-1');
});
sseClient.on('room_message', (data) => {
console.log('Room message:', data);
});
sseClient.on('user_joined', (data) => {
console.log(`User ${data.userId} joined room ${data.room}`);
});
// Broadcast message to room (requires separate HTTP request)
await sseClient.broadcast('chatgpt-app-session-1', {
message: 'Hello from SSE client'
});
For architectural context on event-driven patterns, see our Event-Driven Architecture for ChatGPT Apps guide.
Conflict Resolution: Operational Transformation & CRDT
Operational Transformation (OT) Implementation
Operational Transformation resolves conflicts in collaborative editing by transforming concurrent operations to maintain consistency:
// operational-transformation.ts
export enum OperationType {
INSERT = 'INSERT',
DELETE = 'DELETE',
RETAIN = 'RETAIN'
}
export interface Operation {
type: OperationType;
position: number;
content?: string;
length?: number;
userId: string;
timestamp: number;
version: number;
}
export class OperationalTransformationEngine {
private document: string = '';
private version: number = 0;
private operationHistory: Operation[] = [];
constructor(initialContent: string = '') {
this.document = initialContent;
}
/**
* Apply operation to local document
*/
public applyOperation(op: Operation): string {
switch (op.type) {
case OperationType.INSERT:
this.document =
this.document.slice(0, op.position) +
(op.content || '') +
this.document.slice(op.position);
break;
case OperationType.DELETE:
this.document =
this.document.slice(0, op.position) +
this.document.slice(op.position + (op.length || 0));
break;
case OperationType.RETAIN:
// No-op (used for transformation only)
break;
}
this.version++;
this.operationHistory.push(op);
return this.document;
}
/**
* Transform operation against another operation
* Returns transformed operation that can be applied to document
*/
public transform(op1: Operation, op2: Operation): Operation {
// Both operations are inserts
if (op1.type === OperationType.INSERT && op2.type === OperationType.INSERT) {
if (op1.position < op2.position) {
// op2 happens after op1, shift op2 position
return {
...op2,
position: op2.position + (op1.content?.length || 0)
};
} else if (op1.position > op2.position) {
// op2 happens before op1, no transformation needed
return op2;
} else {
// Same position, use userId/timestamp as tiebreaker
if (op1.userId < op2.userId || (op1.userId === op2.userId && op1.timestamp < op2.timestamp)) {
return {
...op2,
position: op2.position + (op1.content?.length || 0)
};
} else {
return op2;
}
}
}
// op1 is insert, op2 is delete
if (op1.type === OperationType.INSERT && op2.type === OperationType.DELETE) {
if (op1.position <= op2.position) {
// Insert happens before or at delete position
return {
...op2,
position: op2.position + (op1.content?.length || 0)
};
} else if (op1.position >= op2.position + (op2.length || 0)) {
// Insert happens after delete range
return op2;
} else {
// Insert is within delete range (complex case)
return {
...op2,
length: (op2.length || 0) + (op1.content?.length || 0)
};
}
}
// op1 is delete, op2 is insert
if (op1.type === OperationType.DELETE && op2.type === OperationType.INSERT) {
if (op2.position <= op1.position) {
// Insert happens before delete
return op2;
} else if (op2.position >= op1.position + (op1.length || 0)) {
// Insert happens after delete range
return {
...op2,
position: op2.position - (op1.length || 0)
};
} else {
// Insert is within delete range (keep original position)
return {
...op2,
position: op1.position
};
}
}
// Both operations are deletes
if (op1.type === OperationType.DELETE && op2.type === OperationType.DELETE) {
if (op1.position + (op1.length || 0) <= op2.position) {
// op1 delete ends before op2 starts
return {
...op2,
position: op2.position - (op1.length || 0)
};
} else if (op1.position >= op2.position + (op2.length || 0)) {
// op1 delete starts after op2 ends
return op2;
} else {
// Overlapping deletes (complex case)
const op1End = op1.position + (op1.length || 0);
const op2End = op2.position + (op2.length || 0);
if (op2.position < op1.position) {
// op2 starts before op1
if (op2End <= op1.position) {
// No overlap
return {
...op2,
position: op2.position
};
} else {
// Partial overlap
const newLength = Math.max(0, (op2.length || 0) - Math.min(op1End, op2End) + op1.position);
return {
...op2,
length: newLength
};
}
} else {
// op2 starts at or after op1
const newPosition = Math.max(op1.position, op2.position - (op1.length || 0));
const newLength = Math.max(0, (op2.length || 0) - (Math.min(op1End, op2End) - op2.position));
return {
...op2,
position: newPosition,
length: newLength
};
}
}
}
// Default: no transformation
return op2;
}
/**
* Transform operation against history of operations
*/
public transformAgainstHistory(op: Operation, sinceVersion: number): Operation {
let transformed = op;
// Transform against all operations since specified version
for (let i = sinceVersion; i < this.operationHistory.length; i++) {
transformed = this.transform(this.operationHistory[i], transformed);
}
return transformed;
}
public getDocument(): string {
return this.document;
}
public getVersion(): number {
return this.version;
}
public getOperationHistory(): Operation[] {
return this.operationHistory;
}
}
// Usage example
const otEngine = new OperationalTransformationEngine('Hello World');
// User 1 inserts " ChatGPT" at position 5
const op1: Operation = {
type: OperationType.INSERT,
position: 5,
content: ' ChatGPT',
userId: 'user1',
timestamp: Date.now(),
version: 0
};
// User 2 deletes "World" (positions 6-11) concurrently
const op2: Operation = {
type: OperationType.DELETE,
position: 6,
length: 5,
userId: 'user2',
timestamp: Date.now() + 1,
version: 0
};
// Transform op2 against op1
const transformedOp2 = otEngine.transform(op1, op2);
// Apply operations
otEngine.applyOperation(op1);
console.log(otEngine.getDocument()); // "Hello ChatGPT World"
otEngine.applyOperation(transformedOp2);
console.log(otEngine.getDocument()); // "Hello ChatGPT "
CRDT (Conflict-free Replicated Data Type) Implementation
CRDT provides strong eventual consistency without operational transformation:
// crdt.ts
export interface CRDTChar {
id: string;
value: string;
userId: string;
timestamp: number;
position: number[];
}
export class CRDTDocument {
private chars: Map<string, CRDTChar> = new Map();
private userId: string;
constructor(userId: string, initialContent: string = '') {
this.userId = userId;
// Initialize with content
for (let i = 0; i < initialContent.length; i++) {
const char: CRDTChar = {
id: this.generateId(i),
value: initialContent[i],
userId: 'system',
timestamp: Date.now(),
position: [i]
};
this.chars.set(char.id, char);
}
}
/**
* Insert character at position
*/
public insert(index: number, char: string): CRDTChar {
const prevChar = this.getCharAt(index - 1);
const nextChar = this.getCharAt(index);
const position = this.generatePositionBetween(
prevChar?.position || [0],
nextChar?.position || [Number.MAX_SAFE_INTEGER]
);
const newChar: CRDTChar = {
id: this.generateId(this.chars.size),
value: char,
userId: this.userId,
timestamp: Date.now(),
position
};
this.chars.set(newChar.id, newChar);
return newChar;
}
/**
* Delete character at position
*/
public delete(index: number): string | null {
const char = this.getCharAt(index);
if (!char) return null;
this.chars.delete(char.id);
return char.id;
}
/**
* Apply remote operation
*/
public applyRemoteInsert(char: CRDTChar): void {
this.chars.set(char.id, char);
}
/**
* Apply remote deletion
*/
public applyRemoteDelete(charId: string): void {
this.chars.delete(charId);
}
/**
* Get current document as string
*/
public toString(): string {
return this.getSortedChars()
.map(char => char.value)
.join('');
}
/**
* Get character at index
*/
private getCharAt(index: number): CRDTChar | null {
const sorted = this.getSortedChars();
return sorted[index] || null;
}
/**
* Get all characters sorted by position
*/
private getSortedChars(): CRDTChar[] {
return Array.from(this.chars.values()).sort((a, b) => {
return this.comparePositions(a.position, b.position);
});
}
/**
* Compare two position arrays
*/
private comparePositions(pos1: number[], pos2: number[]): number {
const minLength = Math.min(pos1.length, pos2.length);
for (let i = 0; i < minLength; i++) {
if (pos1[i] !== pos2[i]) {
return pos1[i] - pos2[i];
}
}
return pos1.length - pos2.length;
}
/**
* Generate position between two positions
*/
private generatePositionBetween(prev: number[], next: number[]): number[] {
const position: number[] = [];
for (let i = 0; i < Math.max(prev.length, next.length); i++) {
const prevValue = prev[i] || 0;
const nextValue = next[i] || Number.MAX_SAFE_INTEGER;
if (prevValue + 1 < nextValue) {
// Space between positions
position.push(Math.floor((prevValue + nextValue) / 2));
break;
} else if (prevValue === nextValue) {
// Same value, continue to next level
position.push(prevValue);
} else {
// prevValue + 1 === nextValue, need deeper nesting
position.push(prevValue);
position.push(Math.floor(Number.MAX_SAFE_INTEGER / 2));
break;
}
}
return position;
}
/**
* Generate unique character ID
*/
private generateId(seed: number): string {
return `${this.userId}_${Date.now()}_${seed}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Get all characters (for synchronization)
*/
public getChars(): CRDTChar[] {
return Array.from(this.chars.values());
}
/**
* Merge with remote document
*/
public merge(remoteChars: CRDTChar[]): void {
remoteChars.forEach(char => {
// Add or update character if not present or newer
const existing = this.chars.get(char.id);
if (!existing || char.timestamp > existing.timestamp) {
this.chars.set(char.id, char);
}
});
}
}
// Usage example
const doc1 = new CRDTDocument('user1', 'Hello');
const doc2 = new CRDTDocument('user2', 'Hello');
// User 1 inserts " World" at position 5
const insertOp = doc1.insert(5, ' ');
doc1.insert(6, 'W');
doc1.insert(7, 'o');
doc1.insert(8, 'r');
doc1.insert(9, 'l');
doc1.insert(10, 'd');
console.log(doc1.toString()); // "Hello World"
// User 2 deletes "Hello" (positions 0-4) concurrently
doc2.delete(0);
doc2.delete(0);
doc2.delete(0);
doc2.delete(0);
doc2.delete(0);
console.log(doc2.toString()); // ""
// Merge documents (eventual consistency)
doc2.merge(doc1.getChars());
doc1.merge(doc2.getChars());
console.log(doc1.toString()); // " World" (Hello was deleted)
console.log(doc2.toString()); // " World" (both converge to same state)
For more on collaborative features, see our guide on Collaborative ChatGPT Apps: Multi-User Patterns.
Scaling Real-Time Applications with Redis Pub/Sub
When scaling beyond a single server, Redis pub/sub enables message broadcasting across multiple instances:
// redis-pubsub-scaling.ts
import Redis from 'ioredis';
import { WebSocketServer } from 'ws';
export class DistributedWebSocketServer {
private wss: WebSocketServer;
private publisher: Redis;
private subscriber: Redis;
private serverId: string;
private clients: Map<string, any> = new Map();
constructor(port: number, redisUrl: string) {
this.serverId = `server_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// Redis clients (separate for pub/sub)
this.publisher = new Redis(redisUrl);
this.subscriber = new Redis(redisUrl);
// WebSocket server
this.wss = new WebSocketServer({ port });
// Subscribe to Redis channels
this.subscriber.subscribe('broadcast', 'room:*', (err, count) => {
if (err) {
console.error('Failed to subscribe:', err);
} else {
console.log(`Subscribed to ${count} channels`);
}
});
// Handle Redis messages
this.subscriber.on('message', (channel, message) => {
this.handleRedisMessage(channel, message);
});
// Handle WebSocket connections
this.wss.on('connection', (ws, req) => {
this.handleConnection(ws, req);
});
console.log(`Distributed WebSocket server ${this.serverId} listening on port ${port}`);
}
private handleConnection(ws: any, req: any): void {
const clientId = this.generateClientId();
const url = new URL(req.url || '', `http://${req.headers.host}`);
const userId = url.searchParams.get('userId') || 'anonymous';
const client = {
ws,
id: clientId,
userId,
rooms: new Set<string>()
};
this.clients.set(clientId, client);
ws.on('message', (data: Buffer) => {
this.handleClientMessage(clientId, data);
});
ws.on('close', () => {
this.handleDisconnection(clientId);
});
this.send(clientId, {
type: 'connected',
serverId: this.serverId,
clientId
});
}
private handleClientMessage(clientId: string, data: Buffer): void {
try {
const message = JSON.parse(data.toString());
switch (message.type) {
case 'join':
this.joinRoom(clientId, message.room);
break;
case 'broadcast':
this.broadcastToRoom(message.room, message.data, clientId);
break;
case 'global_broadcast':
this.globalBroadcast(message.data, clientId);
break;
}
} catch (error) {
console.error('Failed to handle client message:', error);
}
}
private joinRoom(clientId: string, room: string): void {
const client = this.clients.get(clientId);
if (!client) return;
client.rooms.add(room);
// Subscribe to room channel in Redis
this.subscriber.subscribe(`room:${room}`);
// Publish join event
this.publisher.publish(`room:${room}`, JSON.stringify({
type: 'user_joined',
userId: client.userId,
serverId: this.serverId,
timestamp: Date.now()
}));
}
private broadcastToRoom(room: string, data: any, excludeClientId?: string): void {
// Publish to Redis (will be received by all servers)
this.publisher.publish(`room:${room}`, JSON.stringify({
type: 'room_message',
data,
excludeClientId,
serverId: this.serverId,
timestamp: Date.now()
}));
}
private globalBroadcast(data: any, excludeClientId?: string): void {
// Publish to global broadcast channel
this.publisher.publish('broadcast', JSON.stringify({
type: 'global_message',
data,
excludeClientId,
serverId: this.serverId,
timestamp: Date.now()
}));
}
private handleRedisMessage(channel: string, message: string): void {
try {
const data = JSON.parse(message);
// Don't process messages from this server instance
if (data.serverId === this.serverId) return;
if (channel === 'broadcast') {
// Global broadcast
this.clients.forEach((client, clientId) => {
if (clientId !== data.excludeClientId) {
this.send(clientId, data);
}
});
} else if (channel.startsWith('room:')) {
// Room-specific broadcast
const room = channel.replace('room:', '');
this.clients.forEach((client, clientId) => {
if (client.rooms.has(room) && clientId !== data.excludeClientId) {
this.send(clientId, data);
}
});
}
} catch (error) {
console.error('Failed to handle Redis message:', error);
}
}
private send(clientId: string, message: any): void {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === 1) {
client.ws.send(JSON.stringify(message));
}
}
private handleDisconnection(clientId: string): void {
const client = this.clients.get(clientId);
if (!client) return;
// Notify rooms of disconnection
client.rooms.forEach(room => {
this.publisher.publish(`room:${room}`, JSON.stringify({
type: 'user_left',
userId: client.userId,
serverId: this.serverId,
timestamp: Date.now()
}));
});
this.clients.delete(clientId);
}
private generateClientId(): string {
return `${this.serverId}_client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
// Usage: Start multiple instances behind load balancer
const server1 = new DistributedWebSocketServer(8080, 'redis://localhost:6379');
const server2 = new DistributedWebSocketServer(8081, 'redis://localhost:6379');
Load Balancer Configuration (Nginx):
upstream websocket_backend {
ip_hash; # Sticky sessions for WebSocket
server localhost:8080;
server localhost:8081;
}
server {
listen 80;
server_name ws.makeaihq.com;
location / {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 86400;
}
}
For more on scaling architectures, see our ChatGPT Applications Guide.
Production Real-Time Sync Checklist
Protocol Selection:
- Choose WebSocket for bidirectional, high-frequency updates
- Choose SSE for unidirectional server push with HTTP/2 benefits
- Implement fallback to long polling for legacy environments
Connection Management:
- Implement heartbeat mechanism (ping/pong every 25-30 seconds)
- Automatic reconnection with exponential backoff
- Track last event ID for seamless reconnection (SSE)
- Message queue for offline operation buffering
Conflict Resolution:
- Operational Transformation for text editing use cases
- CRDT for distributed state synchronization
- Versioning system for concurrent operation tracking
- Conflict resolution UI for manual intervention when needed
Scaling:
- Redis pub/sub for multi-server broadcasting
- Load balancer with sticky sessions (ip_hash for WebSocket)
- Horizontal scaling with stateless server design
- Monitoring for connection count and message throughput
Security:
- WSS (WebSocket Secure) with TLS 1.3
- JWT authentication in connection handshake
- Rate limiting for message frequency
- Input validation and sanitization on all messages
Performance:
- Message batching for high-frequency updates (max 50ms delay)
- Binary protocol for large payloads (WebSocket binary frames)
- Compression for text messages (permessage-deflate for WebSocket)
- CDN-based load balancer distribution
Monitoring:
- Connection metrics (active connections, reconnection rate)
- Message latency tracking (p50, p95, p99)
- Error rate monitoring (failed deliveries, timeouts)
- Resource usage (memory per connection, CPU usage)
For complete security best practices, see our WebSocket Security Best Practices for ChatGPT Apps guide.
Build Production-Ready Real-Time ChatGPT Apps Today
Real-time synchronization transforms ChatGPT applications from isolated experiences into collaborative, multi-user platforms. Whether you implement WebSocket bidirectional communication, SSE server-push streaming, operational transformation for conflict resolution, or Redis-based horizontal scaling, the code examples in this guide provide production-tested foundations for enterprise real-time systems.
The most successful ChatGPT apps—collaborative editors, multi-user interfaces, live analytics dashboards—all rely on robust real-time sync patterns. Your choice between WebSocket and SSE, OT and CRDT, single-server and distributed architecture determines not just initial performance but long-term scalability and user experience quality.
Ready to build real-time ChatGPT apps without managing WebSocket infrastructure?
MakeAIHQ provides production-ready real-time sync patterns built into every ChatGPT application you create. From collaborative prompt editing to multi-user widget state synchronization, our platform handles WebSocket connections, conflict resolution, and horizontal scaling automatically—so you can focus on building features, not infrastructure.
Start your free trial: Create your first real-time collaborative ChatGPT app in under 5 minutes. No WebSocket server setup. No Redis configuration. No operational transformation debugging. Get started now →
Related Resources:
- ChatGPT Applications Guide - Complete architectural patterns and best practices
- Event-Driven Architecture for ChatGPT Apps - Message queues, pub/sub, event sourcing
- WebSocket Security Best Practices for ChatGPT Apps - Authentication, encryption, rate limiting
- Collaborative ChatGPT Apps: Multi-User Patterns - Landing page for collaboration features
External References:
- WebSocket Protocol Specification (RFC 6455) - Official WebSocket standard
- Server-Sent Events Specification - W3C SSE standard
- Conflict-free Replicated Data Types (Martin Kleppmann) - Academic overview of CRDTs
Next Steps:
- Choose your real-time protocol based on use case requirements
- Implement production WebSocket/SSE server with heartbeat and reconnection
- Add operational transformation or CRDT for conflict resolution
- Scale with Redis pub/sub for multi-server deployments
- Monitor connection metrics and optimize message latency
Your ChatGPT app's real-time capabilities start here.