1 Distributed Realtime
Anton Nesterov edited this page 2026-02-27 11:38:24 +01:00

Distributed Realtime Service

This guide explains how to implement a distributed, globally-scalable realtime subscription service using Redis Cluster. This architecture allows you to have multiple realtime service instances across different regions while maintaining event consistency.

Architecture Overview

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   VSKI Master   │────▶│  Redis Cluster  │◀────│  Realtime Svc   │
│   (Webhooks)    │     │   (Event Bus)   │     │  (Instance 1)   │
└─────────────────┘     └─────────────────┘     └─────────────────┘
                               │                        │
                               │                ┌─────────────────┐
                               └───────────────▶│  Realtime Svc   │
                                                │  (Instance N)   │
                                                └─────────────────┘

Key Components:

  1. VSKI Master - Emits webhooks on record changes (create/update/delete)
  2. Redis Cluster - Acts as distributed event bus with pub/sub
  3. Realtime Service - Multiple instances that:
    • Accept WebSocket connections from clients
    • Subscribe to Redis channels
    • Distribute events to connected clients

Implementation

1. Project Setup

Create a new Deno project:

mkdir realtime-service
cd realtime-service
deno init

Add dependencies to deno.json:

{
  "tasks": {
    "dev": "deno run --allow-net --allow-env --watch main.ts",
    "start": "deno run --allow-net --allow-env main.ts"
  },
  "imports": {
    "@vski/sdk": "../path/to/vski/client",
    "ioredis": "npm:ioredis@^5.4.0"
  }
}

2. Create the Realtime Service

// main.ts
import { Redis } from "ioredis";
import { verifyToken } from "./auth" // you'll need to implment it
import type { RealtimeEvent, RecordData } from "@vski/sdk";

// Configuration
const JWT_SECRET = Deno.env.get("JWT_SECRET") || "your-jwt-secret";
const REDIS_URL = Deno.env.get("REDIS_URL") || "redis://localhost:6379";
const PORT = parseInt(Deno.env.get("PORT") || "8080");

const redis = new Redis(REDIS_URL);

// Event storage (for replay)
const eventStore = new Map<string, StoredEvent[]>();
const EVENT_TTL = 60 * 60 * 1000; // 1 hour

interface StoredEvent {
  id: string;
  data: RealtimeEvent;
  timestamp: number;
}

interface WebhookPayload {
  database: string;
  collection: string;
  id: string;
  action: "create" | "update" | "delete";
  timestamp: string;
  record: RecordData;
}

interface Client {
  id: string;
  socket: WebSocket;
  subscriptions: Map<string, { collection: string; lastId?: string }>;
}

const clients = new Map<WebSocket, Client>();

// Utility: Generate event ID
const generateEventId = () => `${Date.now()}-${crypto.randomUUID().slice(0, 8)}`;


// Store event for replay
function storeEvent(collection: string, event: StoredEvent): void {
  const events = eventStore.get(collection) || [];
  events.push(event);
  // Cleanup old events
  eventStore.set(
    collection,
    events.filter((e) => Date.now() - e.timestamp < EVENT_TTL)
  );
}

// Get events since lastId
function getEventsSince(collection: string, lastId: string): StoredEvent[] {
  const events = eventStore.get(collection) || [];
  const idx = events.findIndex((e) => e.id === lastId);
  return idx === -1 ? [] : events.slice(idx + 1);
}

// Send events to client
function sendEventsToClient(client: Client, collection: string): void {
  const sub = client.subscriptions.get(collection);
  if (!sub) return;

  const events = sub.lastId ? getEventsSince(collection, sub.lastId) : (eventStore.get(collection) || []);
  if (events.length === 0) return;

  client.socket.send(JSON.stringify({
    type: "EVENT",
    collection,
    events: events.map((e) => ({ id: e.id, data: e.data })),
  }));

  sub.lastId = events[events.length - 1].id;
}

// Handle webhook from VSKI master
async function handleWebhook(request: Request): Promise<Response> {
  if (request.method !== "POST") {
    return new Response("Method not allowed", { status: 405 });
  }

  // Optional: Verify webhook JWT
  const authHeader = request.headers.get("Authorization");
  if (authHeader?.startsWith("Bearer ")) {
    const payload = await verifyToken(authHeader.slice(7));
    if (!payload) return new Response("Unauthorized", { status: 401 });
  }

  const data: WebhookPayload = await request.json();
  const event: StoredEvent = {
    id: generateEventId(),
    data: { action: data.action, record: data.record },
    timestamp: Date.now(),
  };

  storeEvent(data.collection, event);
  await redis.publish(`events:${data.collection}`, JSON.stringify(event));

  return new Response(JSON.stringify({ success: true }), {
    headers: { "Content-Type": "application/json" },
  });
}

// Handle WebSocket connection
function handleWebSocket(request: Request): Response {
  const url = new URL(request.url);
  const token = url.searchParams.get("auth");

  if (!token) {
    return new Response("Unauthorized", { status: 401 });
  }

  const { socket, response } = Deno.upgradeWebSocket(request);
  const client: Client = {
    id: crypto.randomUUID(),
    socket,
    subscriptions: new Map(),
  };

  clients.set(socket, client);

  socket.onopen = () => console.log(`Client ${client.id} connected`);

  socket.onmessage = (event) => {
    const msg = JSON.parse(event.data);

    switch (msg.type) {
      case "SUBSCRIBE": {
        const collection = msg.collection;
        const lastId = msg.lastId || msg.lastIdArg;

        client.subscriptions.set(collection, {
          collection,
          lastId: lastId === "$" ? undefined : lastId,
        });

        sendEventsToClient(client, collection);

        socket.send(JSON.stringify({
          type: "SUBSCRIBED",
          collection,
          last_id: client.subscriptions.get(collection)?.lastId || "0",
        }));
        break;
      }

      case "UNSUBSCRIBE":
        client.subscriptions.delete(msg.collection);
        break;

      case "ACK": {
        const sub = client.subscriptions.get(msg.collection);
        if (sub) sub.lastId = msg.id;
        break;
      }
    }
  };

  socket.onclose = () => {
    clients.delete(socket);
    console.log(`Client ${client.id} disconnected`);
  };

  return response;
}

// Subscribe to Redis channels
async function subscribeToRedis(): Promise<void> {
  const sub = new Redis(REDIS_URL);
  await sub.psubscribe("events:*");

  sub.on("pmessage", (_pattern: string, channel: string, message: string) => {
    if (!channel.startsWith("events:")) return;

    const collection = channel.slice(7);
    const event: StoredEvent = JSON.parse(message);

    storeEvent(collection, event);

    // Broadcast to connected clients
    for (const [socket, client] of clients) {
      if (client.subscriptions.has(collection)) {
        socket.send(JSON.stringify({
          type: "EVENT",
          collection,
          events: [{ id: event.id, data: event.data }],
        }));
      }
    }
  });
}

// HTTP handler
Deno.serve({ port: PORT }, (request: Request) => {
  const url = new URL(request.url);

  if (url.pathname === "/webhook" && request.method === "POST") {
    return handleWebhook(request);
  }

  if (url.pathname === "/api/realtime") {
    return handleWebSocket(request);
  }

  if (url.pathname === "/health") {
    return new Response(JSON.stringify({ status: "ok", clients: clients.size }), {
      headers: { "Content-Type": "application/json" },
    });
  }

  return new Response("Not found", { status: 404 });
});

// Start
console.log(`Realtime service running on port ${PORT}`);
await subscribeToRedis();

3. Configure VSKI Master Webhook

Create a collection with a trigger that sends events to your realtime service:

await adminClient.settings.collections.create({
  name: "posts",
  type: "base",
  fields: [
    { name: "title", type: "text", required: true },
    { name: "content", type: "text" },
  ],
  options: {
    triggers: [
      {
        action: "all",
        url: "https://realtime.example.com/webhook",
        method: "POST",
        jwtEnabled: true,
      },
    ],
  },
});

4. Client Usage

import { VskiClient } from "@vski/sdk";

// Connect to realtime service for subscriptions
const realtime = new VskiClient("https://realtime.example.com");
realtime.setToken(userToken);

realtime.collection("posts").subscribe((event) => {
  console.log(event.action, event.record);
});

// Use master/replica for data operations
const api = new VskiClient("https://master.example.com");
api.setToken(userToken);

await api.collection("posts").create({ title: "Hello", content: "World" });

Docker Deployment

Dockerfile

FROM denoland/deno:2.0.0

WORKDIR /app
COPY deno.json .
COPY main.ts .

ENV PORT=8080
ENV JWT_SECRET=your-jwt-secret
ENV REDIS_URL=redis://redis:6379

EXPOSE 8080
CMD ["task", "start"]

docker-compose.yml

version: "3.8"

services:
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

  realtime:
    build: .
    ports:
      - "8080:8080"
    environment:
      - JWT_SECRET=${JWT_SECRET}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    deploy:
      replicas: 3

volumes:
  redis_data:

Redis Cluster (Production)

import { Redis } from "ioredis";

const redis = new Redis.Cluster([
  { host: "redis-1.example.com", port: 6379 },
  { host: "redis-2.example.com", port: 6379 },
  { host: "redis-3.example.com", port: 6379 },
]);

Load Balancer (nginx)

upstream realtime {
    least_conn;
    server realtime-1:8080;
    server realtime-2:8080;
    server realtime-3:8080;
}

server {
    listen 80;
    server_name realtime.example.com;

    location /api/realtime {
        proxy_pass http://realtime;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }

    location /webhook {
        proxy_pass http://realtime;
    }
}

Security

JWT Verification (Production)

For production, use a proper JWT library:

// Using 'jose' npm package
import { jwtVerify, SignJWT } from "npm:jose@^5.0.0";

const secret = new TextEncoder().encode(JWT_SECRET);

async function verifyToken(token: string) {
  try {
    const { payload } = await jwtVerify(token, secret);
    return payload;
  } catch {
    return null;
  }
}

Rate Limiting

const rateLimiter = new Map<string, number[]>();

function checkRateLimit(ip: string, limit = 100, windowMs = 60000): boolean {
  const now = Date.now();
  const requests = (rateLimiter.get(ip) || []).filter((t) => now - t < windowMs);
  if (requests.length >= limit) return false;
  requests.push(now);
  rateLimiter.set(ip, requests);
  return true;
}

Monitoring

if (url.pathname === "/health") {
  return new Response(JSON.stringify({
    status: "ok",
    clients: clients.size,
    collections: eventStore.size,
    redis: await redis.ping() === "PONG",
  }), { headers: { "Content-Type": "application/json" } });
}

Multi-Region Setup

US Region:
  - Redis Cluster (US)
  - Realtime Service x3
  - Load Balancer

EU Region:
  - Redis Cluster (EU)  
  - Realtime Service x3
  - Load Balancer

Cross-region sync via Redis replication or Kafka

Best Practices

  1. Use same JWT secret as VSKI master for token compatibility
  2. Enable webhook JWT in trigger configuration
  3. Use Redis Cluster for production high availability
  4. Deploy close to users for low latency
  5. Monitor Redis memory for event storage