Data Replication & Synchronization for ChatGPT Apps

Data replication is the foundation of high-availability (HA) ChatGPT applications that serve millions of users globally. When your ChatGPT app processes customer conversations, booking requests, or real-time analytics, a single database failure can cascade into complete service outages, lost revenue, and damaged brand reputation.

Replication creates multiple synchronized copies of your data across different servers, regions, or cloud providers. This redundancy ensures that if your primary database fails, your ChatGPT app seamlessly fails over to a replica with zero data loss. Beyond disaster recovery (DR), replication enables geographic distribution—placing data closer to users in Tokyo, London, and New York reduces latency from 500ms to 50ms, dramatically improving ChatGPT response times.

Modern replication systems balance three critical properties: Consistency (all replicas see the same data), Availability (replicas always respond to requests), and Partition tolerance (the system continues operating despite network failures). The CAP theorem proves you can only achieve two simultaneously. Production ChatGPT apps typically choose eventual consistency (AP systems) for global scale or strong consistency (CP systems) for financial transactions.

This guide demonstrates production-ready replication architectures using PostgreSQL, Redis, and MongoDB—databases commonly powering ChatGPT MCP servers. You'll implement master-slave replication for read scalability, multi-master synchronization for write distribution, conflict resolution strategies using CRDTs and vector clocks, and comprehensive monitoring with lag detection and automatic failover. Whether you're building a fitness studio ChatGPT app or an enterprise data integration platform, these patterns ensure your data remains available, consistent, and synchronized across your infrastructure.

For the complete architectural context, see our Complete Guide to Building ChatGPT Applications.

Master-Slave Replication Architecture

Master-slave (primary-replica) replication is the most common pattern for ChatGPT applications requiring read scalability. All write operations (INSERT, UPDATE, DELETE) go to the master database, which asynchronously streams changes to one or more read replicas. Your ChatGPT app routes 95% of queries—user profile lookups, conversation history retrieval, analytics aggregations—to replicas, reserving the master for writes and transactional consistency.

Implementation benefits:

  • Read scalability: Add replicas to handle 10x, 100x, or 1000x read traffic without impacting write performance
  • Geographic distribution: Place replicas in eu-west-1, ap-southeast-1, and us-east-1 to reduce latency for global users
  • Zero-downtime upgrades: Upgrade replicas one-by-one, fail over to upgraded replica, upgrade former master
  • Backup isolation: Run backup operations on replicas without impacting production master load

Replication lag is the critical metric—the time delay between a write on the master and its appearance on replicas. Asynchronous replication typically incurs 100-500ms lag; synchronous replication guarantees zero lag but degrades write performance by 30-70% due to network round-trips. Most ChatGPT apps tolerate eventual consistency—a user updating their profile might see stale data for 200ms, which is acceptable for non-critical features.

Here's a production master-slave replication manager for PostgreSQL:

// replication-manager.ts - PostgreSQL Master-Slave Replication
import { Pool, PoolConfig } from 'pg';
import { EventEmitter } from 'events';

interface ReplicaConfig extends PoolConfig {
  name: string;
  region: string;
  weight: number; // Load balancing weight (1-100)
}

interface ReplicationStatus {
  replica: string;
  lagBytes: number;
  lagSeconds: number;
  state: 'streaming' | 'catchup' | 'disconnected';
  lastUpdate: Date;
}

export class MasterSlaveReplicationManager extends EventEmitter {
  private master: Pool;
  private replicas: Map<string, Pool>;
  private replicaConfigs: ReplicaConfig[];
  private healthCheckInterval: NodeJS.Timeout | null = null;
  private readonly LAG_THRESHOLD_SECONDS = 5;
  private readonly LAG_THRESHOLD_BYTES = 10 * 1024 * 1024; // 10MB

  constructor(
    masterConfig: PoolConfig,
    replicaConfigs: ReplicaConfig[]
  ) {
    super();

    this.master = new Pool({
      ...masterConfig,
      max: 20,
      idleTimeoutMillis: 30000,
      connectionTimeoutMillis: 5000,
    });

    this.replicaConfigs = replicaConfigs;
    this.replicas = new Map();

    // Initialize replica pools
    replicaConfigs.forEach(config => {
      this.replicas.set(config.name, new Pool({
        ...config,
        max: 50, // More connections for read-heavy workloads
        idleTimeoutMillis: 30000,
      }));
    });
  }

  async startHealthChecks(intervalMs: number = 10000): Promise<void> {
    this.healthCheckInterval = setInterval(async () => {
      const statuses = await this.checkReplicationStatus();

      statuses.forEach(status => {
        if (status.lagSeconds > this.LAG_THRESHOLD_SECONDS) {
          this.emit('lagWarning', status);
        }

        if (status.state === 'disconnected') {
          this.emit('replicaDisconnected', status);
        }
      });
    }, intervalMs);

    console.log(`[ReplicationManager] Health checks started (interval: ${intervalMs}ms)`);
  }

  async checkReplicationStatus(): Promise<ReplicationStatus[]> {
    const statuses: ReplicationStatus[] = [];

    for (const [name, replica] of this.replicas) {
      try {
        const result = await replica.query(`
          SELECT
            pg_last_wal_receive_lsn() AS receive_lsn,
            pg_last_wal_replay_lsn() AS replay_lsn,
            EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS lag_seconds,
            pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn()) AS lag_bytes,
            CASE
              WHEN pg_is_in_recovery() THEN 'streaming'
              ELSE 'disconnected'
            END AS state
        `);

        const row = result.rows[0];
        statuses.push({
          replica: name,
          lagBytes: parseInt(row.lag_bytes) || 0,
          lagSeconds: parseFloat(row.lag_seconds) || 0,
          state: row.state,
          lastUpdate: new Date(),
        });
      } catch (error) {
        console.error(`[ReplicationManager] Failed to check replica ${name}:`, error);
        statuses.push({
          replica: name,
          lagBytes: 0,
          lagSeconds: 0,
          state: 'disconnected',
          lastUpdate: new Date(),
        });
      }
    }

    return statuses;
  }

  async executeWrite(query: string, params?: any[]): Promise<any> {
    // All writes go to master
    const client = await this.master.connect();

    try {
      const result = await client.query(query, params);
      console.log(`[Write] Master executed: ${query.substring(0, 100)}...`);
      return result;
    } finally {
      client.release();
    }
  }

  async executeRead(query: string, params?: any[], preferredRegion?: string): Promise<any> {
    // Select best replica using weighted round-robin + region affinity
    const replica = this.selectReplica(preferredRegion);

    if (!replica) {
      console.warn('[Read] No healthy replicas, falling back to master');
      return this.master.query(query, params);
    }

    try {
      const result = await replica.query(query, params);
      return result;
    } catch (error) {
      console.error('[Read] Replica query failed, falling back to master:', error);
      return this.master.query(query, params);
    }
  }

  private selectReplica(preferredRegion?: string): Pool | null {
    const healthyReplicas = Array.from(this.replicas.entries())
      .filter(([name, _]) => {
        // Filter out unhealthy replicas (would need health check cache)
        return true; // Simplified for example
      });

    if (healthyReplicas.length === 0) return null;

    // Prefer replicas in same region
    if (preferredRegion) {
      const regionalReplica = healthyReplicas.find(([name, _]) => {
        const config = this.replicaConfigs.find(c => c.name === name);
        return config?.region === preferredRegion;
      });

      if (regionalReplica) return regionalReplica[1];
    }

    // Weighted random selection
    const totalWeight = this.replicaConfigs.reduce((sum, c) => sum + c.weight, 0);
    let random = Math.random() * totalWeight;

    for (const [name, pool] of healthyReplicas) {
      const config = this.replicaConfigs.find(c => c.name === name);
      if (!config) continue;

      random -= config.weight;
      if (random <= 0) return pool;
    }

    return healthyReplicas[0][1];
  }

  async promoteReplica(replicaName: string): Promise<void> {
    // Promote replica to master (disaster recovery scenario)
    const replica = this.replicas.get(replicaName);
    if (!replica) {
      throw new Error(`Replica ${replicaName} not found`);
    }

    console.log(`[Failover] Promoting replica ${replicaName} to master...`);

    await replica.query('SELECT pg_promote()');

    // Swap master and replica references
    const oldMaster = this.master;
    this.master = replica;
    this.replicas.delete(replicaName);
    this.replicas.set('old-master', oldMaster);

    this.emit('failover', { newMaster: replicaName });
    console.log(`[Failover] Replica ${replicaName} promoted successfully`);
  }

  async shutdown(): Promise<void> {
    if (this.healthCheckInterval) {
      clearInterval(this.healthCheckInterval);
    }

    await this.master.end();
    for (const [name, replica] of this.replicas) {
      await replica.end();
    }

    console.log('[ReplicationManager] Shutdown complete');
  }
}

// Usage in ChatGPT MCP server
const replicationManager = new MasterSlaveReplicationManager(
  {
    host: 'master.db.internal',
    database: 'chatgpt_app',
    user: 'admin',
    password: process.env.DB_PASSWORD
  },
  [
    { name: 'replica-us-east', host: 'replica1.db.internal', database: 'chatgpt_app',
      user: 'admin', password: process.env.DB_PASSWORD, region: 'us-east-1', weight: 50 },
    { name: 'replica-eu-west', host: 'replica2.db.internal', database: 'chatgpt_app',
      user: 'admin', password: process.env.DB_PASSWORD, region: 'eu-west-1', weight: 30 },
    { name: 'replica-ap-southeast', host: 'replica3.db.internal', database: 'chatgpt_app',
      user: 'admin', password: process.env.DB_PASSWORD, region: 'ap-southeast-1', weight: 20 },
  ]
);

await replicationManager.startHealthChecks(5000);

// Route ChatGPT queries
replicationManager.on('lagWarning', (status) => {
  console.warn(`⚠️ Replica ${status.replica} lag: ${status.lagSeconds}s`);
});

// Write: Create user booking
await replicationManager.executeWrite(
  'INSERT INTO bookings (user_id, class_id, timestamp) VALUES ($1, $2, $3)',
  [userId, classId, new Date()]
);

// Read: Fetch user conversation history (routed to nearest replica)
const result = await replicationManager.executeRead(
  'SELECT * FROM conversations WHERE user_id = $1 ORDER BY created_at DESC LIMIT 50',
  [userId],
  'us-east-1' // Prefer US East replica
);

This manager provides automatic failover (promote replica on master failure), region-aware routing (minimize latency), and lag monitoring (alert on replication delays). For multi-region deployment patterns, see Multi-Region Deployment for ChatGPT Apps.

Multi-Master Replication for Global Writes

Multi-master (active-active) replication allows writes to any replica, bidirectionally synchronizing changes across all nodes. This architecture eliminates write bottlenecks and enables true global distribution—users in Singapore write to the Singapore master, users in Frankfurt write to the Frankfurt master, and all changes propagate within seconds.

Critical challenges:

  • Conflict detection: Two users simultaneously updating the same record in different regions creates a conflict
  • Conflict resolution: The system must deterministically choose which write "wins" (last-write-wins, vector clocks, application-defined)
  • Complexity: Multi-master adds significant operational overhead—synchronization loops, split-brain scenarios, tombstone accumulation

When to use multi-master:

  • Global ChatGPT apps with write-heavy workloads distributed across continents
  • Collaborative features (shared conversation threads, multi-user document editing)
  • Applications requiring 99.99%+ write availability (can't tolerate master failures)

When to avoid:

  • Financial transactions requiring strict serializability (use CP systems like Spanner or CockroachDB)
  • Small-scale apps with <10k writes/day (master-slave is simpler)
  • Teams without deep distributed systems expertise

Here's a production multi-master synchronization engine with conflict detection:

// multi-master-sync.ts - Bidirectional Synchronization Engine
import { Pool } from 'pg';
import { EventEmitter } from 'events';

interface ChangeEvent {
  id: string;
  table: string;
  operation: 'INSERT' | 'UPDATE' | 'DELETE';
  data: Record<string, any>;
  timestamp: Date;
  sourceNode: string;
  vectorClock: Record<string, number>;
}

interface ConflictEvent {
  local: ChangeEvent;
  remote: ChangeEvent;
  resolution: 'local_wins' | 'remote_wins' | 'merge';
}

export class MultiMasterSyncEngine extends EventEmitter {
  private nodes: Map<string, Pool>;
  private nodeId: string;
  private vectorClock: Map<string, number>;
  private syncInterval: NodeJS.Timeout | null = null;

  constructor(
    nodeId: string,
    localConfig: any,
    remoteConfigs: Array<{ id: string; config: any }>
  ) {
    super();

    this.nodeId = nodeId;
    this.nodes = new Map();
    this.vectorClock = new Map();

    // Local node
    this.nodes.set(nodeId, new Pool(localConfig));
    this.vectorClock.set(nodeId, 0);

    // Remote nodes
    remoteConfigs.forEach(({ id, config }) => {
      this.nodes.set(id, new Pool(config));
      this.vectorClock.set(id, 0);
    });
  }

  async startSync(intervalMs: number = 1000): Promise<void> {
    await this.initializeChangeFeed();

    this.syncInterval = setInterval(async () => {
      await this.syncWithRemotes();
    }, intervalMs);

    console.log(`[MultiMasterSync] Synchronization started (node: ${this.nodeId}, interval: ${intervalMs}ms)`);
  }

  private async initializeChangeFeed(): Promise<void> {
    const local = this.nodes.get(this.nodeId)!;

    // Create change log table if not exists
    await local.query(`
      CREATE TABLE IF NOT EXISTS _replication_log (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        table_name TEXT NOT NULL,
        operation TEXT NOT NULL,
        data JSONB NOT NULL,
        timestamp TIMESTAMPTZ DEFAULT now(),
        source_node TEXT NOT NULL,
        vector_clock JSONB NOT NULL,
        applied BOOLEAN DEFAULT FALSE
      )
    `);

    // Create trigger to capture all changes
    await local.query(`
      CREATE OR REPLACE FUNCTION capture_changes()
      RETURNS TRIGGER AS $$
      BEGIN
        INSERT INTO _replication_log (table_name, operation, data, source_node, vector_clock)
        VALUES (
          TG_TABLE_NAME,
          TG_OP,
          CASE
            WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
            ELSE row_to_json(NEW)
          END,
          '${this.nodeId}',
          '${JSON.stringify(Object.fromEntries(this.vectorClock))}'::jsonb
        );
        RETURN NEW;
      END;
      $$ LANGUAGE plpgsql;
    `);

    console.log('[MultiMasterSync] Change feed initialized');
  }

  private async syncWithRemotes(): Promise<void> {
    const local = this.nodes.get(this.nodeId)!;

    for (const [remoteId, remotePool] of this.nodes) {
      if (remoteId === this.nodeId) continue;

      try {
        // Fetch unapplied changes from remote
        const result = await remotePool.query(`
          SELECT * FROM _replication_log
          WHERE source_node != $1 AND applied = FALSE
          ORDER BY timestamp ASC
          LIMIT 1000
        `, [this.nodeId]);

        const changes: ChangeEvent[] = result.rows.map(row => ({
          id: row.id,
          table: row.table_name,
          operation: row.operation,
          data: row.data,
          timestamp: new Date(row.timestamp),
          sourceNode: row.source_node,
          vectorClock: row.vector_clock,
        }));

        // Apply changes to local database
        for (const change of changes) {
          await this.applyChange(change, remoteId);
        }

        // Mark changes as applied on remote
        if (changes.length > 0) {
          const ids = changes.map(c => c.id);
          await remotePool.query(`
            UPDATE _replication_log
            SET applied = TRUE
            WHERE id = ANY($1::uuid[])
          `, [ids]);

          console.log(`[MultiMasterSync] Applied ${changes.length} changes from ${remoteId}`);
        }
      } catch (error) {
        console.error(`[MultiMasterSync] Sync failed with ${remoteId}:`, error);
        this.emit('syncError', { remoteId, error });
      }
    }
  }

  private async applyChange(change: ChangeEvent, sourceNode: string): Promise<void> {
    const local = this.nodes.get(this.nodeId)!;

    // Check for conflicts
    const conflict = await this.detectConflict(change);

    if (conflict) {
      const resolution = this.resolveConflict(conflict);
      this.emit('conflict', { ...conflict, resolution });

      if (resolution === 'local_wins') {
        console.log(`[Conflict] Local wins for ${change.table}:${change.data.id}`);
        return; // Don't apply remote change
      }
    }

    // Apply change
    try {
      switch (change.operation) {
        case 'INSERT':
          await local.query(
            `INSERT INTO ${change.table} SELECT * FROM jsonb_populate_record(NULL::${change.table}, $1)
             ON CONFLICT (id) DO UPDATE SET
             updated_at = EXCLUDED.updated_at,
             data = EXCLUDED.data`,
            [change.data]
          );
          break;

        case 'UPDATE':
          await local.query(
            `UPDATE ${change.table} SET
             updated_at = $2,
             data = $3
             WHERE id = $1`,
            [change.data.id, change.timestamp, change.data]
          );
          break;

        case 'DELETE':
          await local.query(
            `DELETE FROM ${change.table} WHERE id = $1`,
            [change.data.id]
          );
          break;
      }

      // Update vector clock
      const remoteClock = change.vectorClock[sourceNode] || 0;
      this.vectorClock.set(sourceNode, Math.max(this.vectorClock.get(sourceNode) || 0, remoteClock));

    } catch (error) {
      console.error(`[MultiMasterSync] Failed to apply change:`, error);
      throw error;
    }
  }

  private async detectConflict(remoteChange: ChangeEvent): Promise<ConflictEvent | null> {
    const local = this.nodes.get(this.nodeId)!;

    // Check if local has conflicting change for same record
    const result = await local.query(`
      SELECT * FROM _replication_log
      WHERE table_name = $1
      AND (data->>'id') = $2
      AND timestamp > $3 - INTERVAL '10 seconds'
      AND source_node = $4
      ORDER BY timestamp DESC
      LIMIT 1
    `, [remoteChange.table, remoteChange.data.id, remoteChange.timestamp, this.nodeId]);

    if (result.rows.length === 0) return null;

    const localChange: ChangeEvent = {
      id: result.rows[0].id,
      table: result.rows[0].table_name,
      operation: result.rows[0].operation,
      data: result.rows[0].data,
      timestamp: new Date(result.rows[0].timestamp),
      sourceNode: result.rows[0].source_node,
      vectorClock: result.rows[0].vector_clock,
    };

    return {
      local: localChange,
      remote: remoteChange,
      resolution: 'local_wins', // Default, will be overridden
    };
  }

  private resolveConflict(conflict: ConflictEvent): 'local_wins' | 'remote_wins' | 'merge' {
    // Last-write-wins (LWW) using timestamp
    if (conflict.local.timestamp > conflict.remote.timestamp) {
      return 'local_wins';
    } else if (conflict.remote.timestamp > conflict.local.timestamp) {
      return 'remote_wins';
    }

    // Tie-breaker: higher node ID wins
    return conflict.local.sourceNode > conflict.remote.sourceNode ? 'local_wins' : 'remote_wins';
  }

  async shutdown(): Promise<void> {
    if (this.syncInterval) {
      clearInterval(this.syncInterval);
    }

    for (const [id, pool] of this.nodes) {
      await pool.end();
    }

    console.log('[MultiMasterSync] Shutdown complete');
  }
}

// Usage across 3 global regions
const usEastSync = new MultiMasterSyncEngine(
  'us-east-1',
  { host: 'db-us-east.internal', database: 'chatgpt_app', user: 'admin', password: process.env.DB_PASSWORD },
  [
    { id: 'eu-west-1', config: { host: 'db-eu-west.internal', database: 'chatgpt_app', user: 'admin', password: process.env.DB_PASSWORD } },
    { id: 'ap-southeast-1', config: { host: 'db-ap-southeast.internal', database: 'chatgpt_app', user: 'admin', password: process.env.DB_PASSWORD } },
  ]
);

usEastSync.on('conflict', (conflict) => {
  console.log(`⚠️ Conflict detected: ${conflict.resolution}`);
  // Log to monitoring system
});

await usEastSync.startSync(500); // Sync every 500ms

This engine captures all database changes using PostgreSQL triggers, propagates them to remote nodes, detects conflicts using timestamp comparison, and resolves conflicts with last-write-wins (LWW). For more advanced patterns, see Consistency Models for ChatGPT Apps.

Conflict Resolution Strategies

Multi-master replication inevitably creates conflicts—two users simultaneously editing the same booking, profile, or conversation thread in different data centers. Your conflict resolution strategy determines whether data is lost, merged intelligently, or preserved with user intervention.

Last-Write-Wins (LWW)

The simplest strategy: use wall-clock timestamps to determine which write occurred "last" and discard the earlier write. LWW is fast, deterministic, and works well for eventually-consistent systems where occasional data loss is acceptable (user profile updates, preference settings).

Limitations:

  • Clock skew: If clocks drift by 500ms between nodes, writes can be ordered incorrectly
  • Data loss: The "losing" write is permanently discarded without user notification
  • Non-commutative: Concurrent updates to different fields (user changes email in US, password in EU) result in one full overwrite

Vector Clocks

Vector clocks track causality by maintaining a version counter per node. Each write increments the local counter and includes the full vector clock. When comparing two versions, you can detect:

  • Ancestor relationship: Version A happened-before version B (no conflict)
  • Concurrent updates: Neither version happened-before the other (conflict requires resolution)

Vector clocks enable sibling detection—the system preserves both versions and lets the application or user choose which to keep.

// vector-clock-resolver.ts - Vector Clock Conflict Resolution
interface VectorClock {
  [nodeId: string]: number;
}

interface VersionedDocument {
  id: string;
  data: Record<string, any>;
  vectorClock: VectorClock;
  timestamp: Date;
}

export class VectorClockResolver {
  compare(a: VectorClock, b: VectorClock): 'ancestor' | 'descendant' | 'concurrent' {
    const nodesA = new Set(Object.keys(a));
    const nodesB = new Set(Object.keys(b));
    const allNodes = new Set([...nodesA, ...nodesB]);

    let aGreaterEqual = true;
    let bGreaterEqual = true;

    for (const node of allNodes) {
      const versionA = a[node] || 0;
      const versionB = b[node] || 0;

      if (versionA < versionB) aGreaterEqual = false;
      if (versionB < versionA) bGreaterEqual = false;
    }

    if (aGreaterEqual && bGreaterEqual) return 'ancestor'; // Equal (same version)
    if (aGreaterEqual) return 'descendant'; // A is newer (A happened-after B)
    if (bGreaterEqual) return 'ancestor'; // B is newer (B happened-after A)

    return 'concurrent'; // Neither is ancestor (conflict!)
  }

  merge(a: VersionedDocument, b: VersionedDocument): VersionedDocument {
    const relationship = this.compare(a.vectorClock, b.vectorClock);

    switch (relationship) {
      case 'descendant':
        return a; // A is newer, keep A

      case 'ancestor':
        return b; // B is newer, keep B

      case 'concurrent':
        // Concurrent conflict - merge field-by-field
        console.warn(`⚠️ Concurrent conflict for document ${a.id}`);

        // Field-level merge: newer timestamp wins per field
        const merged = { ...a };

        for (const [key, valueB] of Object.entries(b.data)) {
          const valueA = a.data[key];

          // If field differs, use most recent version
          if (JSON.stringify(valueA) !== JSON.stringify(valueB)) {
            merged.data[key] = b.timestamp > a.timestamp ? valueB : valueA;
          }
        }

        // Merge vector clocks (take maximum version per node)
        const mergedClock: VectorClock = {};
        const allNodes = new Set([
          ...Object.keys(a.vectorClock),
          ...Object.keys(b.vectorClock),
        ]);

        for (const node of allNodes) {
          mergedClock[node] = Math.max(
            a.vectorClock[node] || 0,
            b.vectorClock[node] || 0
          );
        }

        merged.vectorClock = mergedClock;
        merged.timestamp = new Date();

        return merged;
    }
  }

  increment(clock: VectorClock, nodeId: string): VectorClock {
    return {
      ...clock,
      [nodeId]: (clock[nodeId] || 0) + 1,
    };
  }
}

// Usage in ChatGPT conversation sync
const resolver = new VectorClockResolver();

const conversationUS: VersionedDocument = {
  id: 'conv-123',
  data: { title: 'Booking Help', messages: 10, status: 'active' },
  vectorClock: { 'us-east-1': 5, 'eu-west-1': 3 },
  timestamp: new Date('2026-12-25T10:30:00Z'),
};

const conversationEU: VersionedDocument = {
  id: 'conv-123',
  data: { title: 'Booking Assistance', messages: 10, status: 'resolved' },
  vectorClock: { 'us-east-1': 4, 'eu-west-1': 6 },
  timestamp: new Date('2026-12-25T10:30:05Z'),
};

const merged = resolver.merge(conversationUS, conversationEU);
console.log('Merged conversation:', merged);
// Result: { title: 'Booking Assistance', status: 'resolved', ... }

CRDTs (Conflict-Free Replicated Data Types)

CRDTs are data structures designed to be merged without conflicts. Common CRDT types:

  • G-Counter: Increment-only counter (sum all node counters)
  • PN-Counter: Increment/decrement counter (separate P and N counters)
  • LWW-Register: Last-write-wins register with timestamp
  • OR-Set: Add/remove set preserving all additions

CRDTs are ideal for collaborative ChatGPT features—shared conversation threads, multi-user document editing, real-time analytics dashboards.

// crdt-counter.ts - PN-Counter CRDT Implementation
export class PNCounter {
  private increments: Map<string, number>;
  private decrements: Map<string, number>;
  private nodeId: string;

  constructor(nodeId: string) {
    this.nodeId = nodeId;
    this.increments = new Map();
    this.decrements = new Map();
  }

  increment(delta: number = 1): void {
    const current = this.increments.get(this.nodeId) || 0;
    this.increments.set(this.nodeId, current + delta);
  }

  decrement(delta: number = 1): void {
    const current = this.decrements.get(this.nodeId) || 0;
    this.decrements.set(this.nodeId, current + delta);
  }

  value(): number {
    let total = 0;

    for (const count of this.increments.values()) {
      total += count;
    }

    for (const count of this.decrements.values()) {
      total -= count;
    }

    return total;
  }

  merge(other: PNCounter): void {
    // Merge increments (take max per node)
    for (const [node, count] of other.increments) {
      this.increments.set(
        node,
        Math.max(this.increments.get(node) || 0, count)
      );
    }

    // Merge decrements (take max per node)
    for (const [node, count] of other.decrements) {
      this.decrements.set(
        node,
        Math.max(this.decrements.get(node) || 0, count)
      );
    }
  }

  toJSON(): any {
    return {
      nodeId: this.nodeId,
      increments: Object.fromEntries(this.increments),
      decrements: Object.fromEntries(this.decrements),
    };
  }

  static fromJSON(data: any): PNCounter {
    const counter = new PNCounter(data.nodeId);
    counter.increments = new Map(Object.entries(data.increments));
    counter.decrements = new Map(Object.entries(data.decrements));
    return counter;
  }
}

// Usage: Track active ChatGPT sessions across regions
const sessionCounterUS = new PNCounter('us-east-1');
const sessionCounterEU = new PNCounter('eu-west-1');

// User starts session in US
sessionCounterUS.increment();
console.log('US sessions:', sessionCounterUS.value()); // 1

// User starts session in EU
sessionCounterEU.increment();
console.log('EU sessions:', sessionCounterEU.value()); // 1

// Replicate counters
sessionCounterUS.merge(sessionCounterEU);
sessionCounterEU.merge(sessionCounterUS);

console.log('Global sessions (US):', sessionCounterUS.value()); // 2
console.log('Global sessions (EU):', sessionCounterEU.value()); // 2

// User ends session in US
sessionCounterUS.decrement();
sessionCounterUS.merge(sessionCounterEU);

console.log('Global sessions after disconnect:', sessionCounterUS.value()); // 1

For disaster recovery strategies incorporating conflict resolution, see Disaster Recovery Planning for ChatGPT Apps.

Synchronization Protocols

Modern replication systems use three primary synchronization protocols to propagate changes from master to replicas or between multi-master nodes.

Change Data Capture (CDC)

CDC monitors database transaction logs (PostgreSQL WAL, MySQL binlog, MongoDB oplog) and streams change events to downstream consumers. Benefits:

  • Zero application impact: No triggers or application-level code required
  • Complete fidelity: Captures all changes (DDL, DML, schema evolution)
  • Low latency: Sub-second replication lag typical

Popular CDC tools: Debezium, Maxwell, AWS DMS.

// cdc-pipeline.ts - Change Data Capture Pipeline
import { EventEmitter } from 'events';
import pg from 'pg';

interface CDCEvent {
  lsn: string;
  table: string;
  operation: 'INSERT' | 'UPDATE' | 'DELETE';
  before: Record<string, any> | null;
  after: Record<string, any> | null;
  timestamp: Date;
}

export class PostgresCDCPipeline extends EventEmitter {
  private sourcePool: pg.Pool;
  private replicationClient: pg.Client | null = null;
  private slotName: string;
  private publicationName: string;

  constructor(
    config: pg.PoolConfig,
    slotName: string = 'chatgpt_app_cdc_slot',
    publicationName: string = 'chatgpt_app_publication'
  ) {
    super();

    this.sourcePool = new pg.Pool(config);
    this.slotName = slotName;
    this.publicationName = publicationName;
  }

  async initialize(): Promise<void> {
    // Create publication (if not exists)
    await this.sourcePool.query(`
      DO $$
      BEGIN
        IF NOT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = '${this.publicationName}') THEN
          CREATE PUBLICATION ${this.publicationName} FOR ALL TABLES;
        END IF;
      END
      $$;
    `);

    // Create replication slot (if not exists)
    await this.sourcePool.query(`
      SELECT pg_create_logical_replication_slot('${this.slotName}', 'pgoutput')
      WHERE NOT EXISTS (
        SELECT 1 FROM pg_replication_slots WHERE slot_name = '${this.slotName}'
      )
    `);

    console.log(`[CDC] Initialized publication: ${this.publicationName}, slot: ${this.slotName}`);
  }

  async start(): Promise<void> {
    // Connect with replication protocol
    this.replicationClient = new pg.Client({
      ...this.sourcePool.options,
      replication: 'database',
    });

    await this.replicationClient.connect();

    // Start logical replication stream
    const stream = this.replicationClient.query(`
      START_REPLICATION SLOT ${this.slotName} LOGICAL 0/0
      (proto_version '1', publication_names '${this.publicationName}')
    `);

    // Process replication messages
    this.replicationClient.on('replicationMessage', (msg: any) => {
      if (msg.tag === 'insert' || msg.tag === 'update' || msg.tag === 'delete') {
        const event = this.parseWALMessage(msg);
        this.emit('change', event);
      }
    });

    console.log('[CDC] Replication stream started');
  }

  private parseWALMessage(msg: any): CDCEvent {
    // Simplified WAL parsing (production would use pgoutput decoder)
    return {
      lsn: msg.lsn,
      table: msg.relation?.name || 'unknown',
      operation: msg.tag.toUpperCase() as any,
      before: msg.tag === 'update' || msg.tag === 'delete' ? msg.old : null,
      after: msg.tag === 'insert' || msg.tag === 'update' ? msg.new : null,
      timestamp: new Date(),
    };
  }

  async shutdown(): Promise<void> {
    if (this.replicationClient) {
      await this.replicationClient.end();
    }

    await this.sourcePool.end();
    console.log('[CDC] Pipeline shutdown');
  }
}

// Usage: Replicate to secondary database
const cdc = new PostgresCDCPipeline({
  host: 'master.db.internal',
  database: 'chatgpt_app',
  user: 'replication_user',
  password: process.env.REPLICATION_PASSWORD,
});

await cdc.initialize();
await cdc.start();

cdc.on('change', async (event: CDCEvent) => {
  console.log(`[CDC] Change detected: ${event.operation} on ${event.table}`);

  // Apply to replica (or Kafka, Elasticsearch, cache, etc.)
  // await replicaDb.applyChange(event);
});

Event Sourcing

Event sourcing stores all state changes as an immutable event log. The current state is derived by replaying events from the beginning. Benefits:

  • Perfect audit trail: Every change is preserved with full context
  • Time-travel debugging: Replay events to recreate any historical state
  • Event-driven architecture: Events flow to analytics, caching, search indexing

Drawbacks: Increased storage, complex event schema evolution.

Log Shipping

Binary log shipping copies raw database WAL files to replicas, which replay the logs to reconstruct state. This is PostgreSQL's physical replication mechanism. Benefits:

  • Byte-for-byte identical replicas: No schema drift or data inconsistencies
  • Minimal overhead: No parsing or transformation logic

Drawbacks: Replicas must be same PostgreSQL version, can't filter or transform data.

Monitoring Replication Health

Production replication systems require continuous monitoring to detect lag spikes, replication breaks, and split-brain scenarios before they impact users.

Key Metrics

// replication-monitor.ts - Real-Time Replication Monitoring
import { Pool } from 'pg';
import { EventEmitter } from 'events';

interface ReplicationMetrics {
  lagBytes: number;
  lagSeconds: number;
  throughputBytesPerSec: number;
  replicationState: 'streaming' | 'catchup' | 'disconnected' | 'error';
  lastFlushLSN: string;
  appliedLSN: string;
  timestamp: Date;
}

export class ReplicationMonitor extends EventEmitter {
  private master: Pool;
  private replicas: Map<string, Pool>;
  private metrics: Map<string, ReplicationMetrics[]> = new Map();
  private readonly METRIC_RETENTION_SECONDS = 300;

  constructor(masterConfig: any, replicaConfigs: Array<{ name: string; config: any }>) {
    super();

    this.master = new Pool(masterConfig);
    this.replicas = new Map(
      replicaConfigs.map(({ name, config }) => [name, new Pool(config)])
    );
  }

  async collectMetrics(): Promise<Map<string, ReplicationMetrics>> {
    const current = new Map<string, ReplicationMetrics>();

    for (const [name, replica] of this.replicas) {
      try {
        const result = await replica.query(`
          SELECT
            pg_last_wal_receive_lsn() AS received_lsn,
            pg_last_wal_replay_lsn() AS applied_lsn,
            pg_last_xact_replay_timestamp() AS last_replay_time,
            EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS lag_seconds,
            pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn()) AS lag_bytes,
            CASE
              WHEN pg_is_in_recovery() AND pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 'streaming'
              WHEN pg_is_in_recovery() THEN 'catchup'
              ELSE 'error'
            END AS state
        `);

        const row = result.rows[0];
        const metrics: ReplicationMetrics = {
          lagBytes: parseInt(row.lag_bytes) || 0,
          lagSeconds: parseFloat(row.lag_seconds) || 0,
          throughputBytesPerSec: 0, // Calculated from historical data
          replicationState: row.state,
          lastFlushLSN: row.received_lsn,
          appliedLSN: row.applied_lsn,
          timestamp: new Date(),
        };

        // Calculate throughput
        const history = this.metrics.get(name) || [];
        if (history.length > 0) {
          const previous = history[history.length - 1];
          const timeDelta = (metrics.timestamp.getTime() - previous.timestamp.getTime()) / 1000;
          const bytesDelta = metrics.lagBytes - previous.lagBytes;
          metrics.throughputBytesPerSec = Math.abs(bytesDelta / timeDelta);
        }

        current.set(name, metrics);

        // Store metrics
        if (!this.metrics.has(name)) {
          this.metrics.set(name, []);
        }
        this.metrics.get(name)!.push(metrics);

        // Trim old metrics
        const cutoff = new Date(Date.now() - this.METRIC_RETENTION_SECONDS * 1000);
        this.metrics.set(
          name,
          this.metrics.get(name)!.filter(m => m.timestamp > cutoff)
        );

      } catch (error) {
        console.error(`[Monitor] Failed to collect metrics for ${name}:`, error);
        current.set(name, {
          lagBytes: 0,
          lagSeconds: 0,
          throughputBytesPerSec: 0,
          replicationState: 'disconnected',
          lastFlushLSN: '0/0',
          appliedLSN: '0/0',
          timestamp: new Date(),
        });
      }
    }

    return current;
  }

  async startMonitoring(intervalMs: number = 5000): Promise<void> {
    setInterval(async () => {
      const metrics = await this.collectMetrics();

      for (const [name, metric] of metrics) {
        // Alert on high lag
        if (metric.lagSeconds > 10) {
          this.emit('highLag', { replica: name, lagSeconds: metric.lagSeconds });
        }

        // Alert on disconnection
        if (metric.replicationState === 'disconnected') {
          this.emit('disconnected', { replica: name });
        }

        // Alert on low throughput
        if (metric.throughputBytesPerSec < 1000) {
          this.emit('lowThroughput', { replica: name, throughput: metric.throughputBytesPerSec });
        }
      }
    }, intervalMs);

    console.log(`[Monitor] Monitoring started (interval: ${intervalMs}ms)`);
  }

  getMetricsSummary(replicaName: string): any {
    const history = this.metrics.get(replicaName) || [];

    if (history.length === 0) {
      return { error: 'No metrics available' };
    }

    const lagSeconds = history.map(m => m.lagSeconds);
    const lagBytes = history.map(m => m.lagBytes);

    return {
      replica: replicaName,
      avgLagSeconds: lagSeconds.reduce((a, b) => a + b, 0) / lagSeconds.length,
      maxLagSeconds: Math.max(...lagSeconds),
      avgLagBytes: lagBytes.reduce((a, b) => a + b, 0) / lagBytes.length,
      maxLagBytes: Math.max(...lagBytes),
      currentState: history[history.length - 1].replicationState,
      sampleCount: history.length,
    };
  }

  async shutdown(): Promise<void> {
    await this.master.end();

    for (const [name, replica] of this.replicas) {
      await replica.end();
    }

    console.log('[Monitor] Shutdown complete');
  }
}

// Usage: Monitor replication health
const monitor = new ReplicationMonitor(
  { host: 'master.db.internal', database: 'chatgpt_app', user: 'admin', password: process.env.DB_PASSWORD },
  [
    { name: 'replica-us', config: { host: 'replica-us.db.internal', database: 'chatgpt_app', user: 'admin', password: process.env.DB_PASSWORD } },
    { name: 'replica-eu', config: { host: 'replica-eu.db.internal', database: 'chatgpt_app', user: 'admin', password: process.env.DB_PASSWORD } },
  ]
);

monitor.on('highLag', ({ replica, lagSeconds }) => {
  console.error(`🚨 High replication lag on ${replica}: ${lagSeconds}s`);
  // Send PagerDuty alert
});

monitor.on('disconnected', ({ replica }) => {
  console.error(`🚨 Replica ${replica} disconnected!`);
  // Trigger failover automation
});

await monitor.startMonitoring(3000);

// Periodic summary
setInterval(() => {
  console.log('[Summary] Replica US:', monitor.getMetricsSummary('replica-us'));
  console.log('[Summary] Replica EU:', monitor.getMetricsSummary('replica-eu'));
}, 60000);

Alerting thresholds:

  • Warning: Lag > 5 seconds
  • Critical: Lag > 30 seconds or replication disconnected
  • Action: Automatic failover if lag > 60 seconds and master is healthy

Build Production-Ready Replication with MakeAIHQ

Data replication transforms ChatGPT apps from fragile single-server systems into globally distributed, fault-tolerant platforms. Master-slave replication provides read scalability for conversation-heavy workloads. Multi-master replication enables global writes for collaborative features. CRDTs and vector clocks resolve conflicts deterministically. CDC pipelines stream changes to analytics, caching, and search systems. Comprehensive monitoring detects issues before they cascade into outages.

These patterns form the foundation of enterprise-grade ChatGPT applications serving millions of users across continents with 99.99% uptime SLAs.

Ready to deploy a ChatGPT app with production replication? MakeAIHQ generates ChatGPT MCP servers with built-in replication support, conflict resolution, and monitoring—no distributed systems PhD required. From zero to globally replicated ChatGPT app in 48 hours.

Start your free trial →

Related guides: