Page:
Distributed Realtime
No results
1
Distributed Realtime
Anton Nesterov edited this page 2026-02-27 11:38:24 +01:00
Distributed Realtime Service
This guide explains how to implement a distributed, globally-scalable realtime subscription service using Redis Cluster. This architecture allows you to have multiple realtime service instances across different regions while maintaining event consistency.
Architecture Overview
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ VSKI Master │────▶│ Redis Cluster │◀────│ Realtime Svc │
│ (Webhooks) │ │ (Event Bus) │ │ (Instance 1) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
│ ┌─────────────────┐
└───────────────▶│ Realtime Svc │
│ (Instance N) │
└─────────────────┘
Key Components:
- VSKI Master - Emits webhooks on record changes (create/update/delete)
- Redis Cluster - Acts as distributed event bus with pub/sub
- Realtime Service - Multiple instances that:
- Accept WebSocket connections from clients
- Subscribe to Redis channels
- Distribute events to connected clients
Implementation
1. Project Setup
Create a new Deno project:
mkdir realtime-service
cd realtime-service
deno init
Add dependencies to deno.json:
{
"tasks": {
"dev": "deno run --allow-net --allow-env --watch main.ts",
"start": "deno run --allow-net --allow-env main.ts"
},
"imports": {
"@vski/sdk": "../path/to/vski/client",
"ioredis": "npm:ioredis@^5.4.0"
}
}
2. Create the Realtime Service
// main.ts
import { Redis } from "ioredis";
import { verifyToken } from "./auth" // you'll need to implment it
import type { RealtimeEvent, RecordData } from "@vski/sdk";
// Configuration
const JWT_SECRET = Deno.env.get("JWT_SECRET") || "your-jwt-secret";
const REDIS_URL = Deno.env.get("REDIS_URL") || "redis://localhost:6379";
const PORT = parseInt(Deno.env.get("PORT") || "8080");
const redis = new Redis(REDIS_URL);
// Event storage (for replay)
const eventStore = new Map<string, StoredEvent[]>();
const EVENT_TTL = 60 * 60 * 1000; // 1 hour
interface StoredEvent {
id: string;
data: RealtimeEvent;
timestamp: number;
}
interface WebhookPayload {
database: string;
collection: string;
id: string;
action: "create" | "update" | "delete";
timestamp: string;
record: RecordData;
}
interface Client {
id: string;
socket: WebSocket;
subscriptions: Map<string, { collection: string; lastId?: string }>;
}
const clients = new Map<WebSocket, Client>();
// Utility: Generate event ID
const generateEventId = () => `${Date.now()}-${crypto.randomUUID().slice(0, 8)}`;
// Store event for replay
function storeEvent(collection: string, event: StoredEvent): void {
const events = eventStore.get(collection) || [];
events.push(event);
// Cleanup old events
eventStore.set(
collection,
events.filter((e) => Date.now() - e.timestamp < EVENT_TTL)
);
}
// Get events since lastId
function getEventsSince(collection: string, lastId: string): StoredEvent[] {
const events = eventStore.get(collection) || [];
const idx = events.findIndex((e) => e.id === lastId);
return idx === -1 ? [] : events.slice(idx + 1);
}
// Send events to client
function sendEventsToClient(client: Client, collection: string): void {
const sub = client.subscriptions.get(collection);
if (!sub) return;
const events = sub.lastId ? getEventsSince(collection, sub.lastId) : (eventStore.get(collection) || []);
if (events.length === 0) return;
client.socket.send(JSON.stringify({
type: "EVENT",
collection,
events: events.map((e) => ({ id: e.id, data: e.data })),
}));
sub.lastId = events[events.length - 1].id;
}
// Handle webhook from VSKI master
async function handleWebhook(request: Request): Promise<Response> {
if (request.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
// Optional: Verify webhook JWT
const authHeader = request.headers.get("Authorization");
if (authHeader?.startsWith("Bearer ")) {
const payload = await verifyToken(authHeader.slice(7));
if (!payload) return new Response("Unauthorized", { status: 401 });
}
const data: WebhookPayload = await request.json();
const event: StoredEvent = {
id: generateEventId(),
data: { action: data.action, record: data.record },
timestamp: Date.now(),
};
storeEvent(data.collection, event);
await redis.publish(`events:${data.collection}`, JSON.stringify(event));
return new Response(JSON.stringify({ success: true }), {
headers: { "Content-Type": "application/json" },
});
}
// Handle WebSocket connection
function handleWebSocket(request: Request): Response {
const url = new URL(request.url);
const token = url.searchParams.get("auth");
if (!token) {
return new Response("Unauthorized", { status: 401 });
}
const { socket, response } = Deno.upgradeWebSocket(request);
const client: Client = {
id: crypto.randomUUID(),
socket,
subscriptions: new Map(),
};
clients.set(socket, client);
socket.onopen = () => console.log(`Client ${client.id} connected`);
socket.onmessage = (event) => {
const msg = JSON.parse(event.data);
switch (msg.type) {
case "SUBSCRIBE": {
const collection = msg.collection;
const lastId = msg.lastId || msg.lastIdArg;
client.subscriptions.set(collection, {
collection,
lastId: lastId === "$" ? undefined : lastId,
});
sendEventsToClient(client, collection);
socket.send(JSON.stringify({
type: "SUBSCRIBED",
collection,
last_id: client.subscriptions.get(collection)?.lastId || "0",
}));
break;
}
case "UNSUBSCRIBE":
client.subscriptions.delete(msg.collection);
break;
case "ACK": {
const sub = client.subscriptions.get(msg.collection);
if (sub) sub.lastId = msg.id;
break;
}
}
};
socket.onclose = () => {
clients.delete(socket);
console.log(`Client ${client.id} disconnected`);
};
return response;
}
// Subscribe to Redis channels
async function subscribeToRedis(): Promise<void> {
const sub = new Redis(REDIS_URL);
await sub.psubscribe("events:*");
sub.on("pmessage", (_pattern: string, channel: string, message: string) => {
if (!channel.startsWith("events:")) return;
const collection = channel.slice(7);
const event: StoredEvent = JSON.parse(message);
storeEvent(collection, event);
// Broadcast to connected clients
for (const [socket, client] of clients) {
if (client.subscriptions.has(collection)) {
socket.send(JSON.stringify({
type: "EVENT",
collection,
events: [{ id: event.id, data: event.data }],
}));
}
}
});
}
// HTTP handler
Deno.serve({ port: PORT }, (request: Request) => {
const url = new URL(request.url);
if (url.pathname === "/webhook" && request.method === "POST") {
return handleWebhook(request);
}
if (url.pathname === "/api/realtime") {
return handleWebSocket(request);
}
if (url.pathname === "/health") {
return new Response(JSON.stringify({ status: "ok", clients: clients.size }), {
headers: { "Content-Type": "application/json" },
});
}
return new Response("Not found", { status: 404 });
});
// Start
console.log(`Realtime service running on port ${PORT}`);
await subscribeToRedis();
3. Configure VSKI Master Webhook
Create a collection with a trigger that sends events to your realtime service:
await adminClient.settings.collections.create({
name: "posts",
type: "base",
fields: [
{ name: "title", type: "text", required: true },
{ name: "content", type: "text" },
],
options: {
triggers: [
{
action: "all",
url: "https://realtime.example.com/webhook",
method: "POST",
jwtEnabled: true,
},
],
},
});
4. Client Usage
import { VskiClient } from "@vski/sdk";
// Connect to realtime service for subscriptions
const realtime = new VskiClient("https://realtime.example.com");
realtime.setToken(userToken);
realtime.collection("posts").subscribe((event) => {
console.log(event.action, event.record);
});
// Use master/replica for data operations
const api = new VskiClient("https://master.example.com");
api.setToken(userToken);
await api.collection("posts").create({ title: "Hello", content: "World" });
Docker Deployment
Dockerfile
FROM denoland/deno:2.0.0
WORKDIR /app
COPY deno.json .
COPY main.ts .
ENV PORT=8080
ENV JWT_SECRET=your-jwt-secret
ENV REDIS_URL=redis://redis:6379
EXPOSE 8080
CMD ["task", "start"]
docker-compose.yml
version: "3.8"
services:
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
command: redis-server --appendonly yes
realtime:
build: .
ports:
- "8080:8080"
environment:
- JWT_SECRET=${JWT_SECRET}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
deploy:
replicas: 3
volumes:
redis_data:
Redis Cluster (Production)
import { Redis } from "ioredis";
const redis = new Redis.Cluster([
{ host: "redis-1.example.com", port: 6379 },
{ host: "redis-2.example.com", port: 6379 },
{ host: "redis-3.example.com", port: 6379 },
]);
Load Balancer (nginx)
upstream realtime {
least_conn;
server realtime-1:8080;
server realtime-2:8080;
server realtime-3:8080;
}
server {
listen 80;
server_name realtime.example.com;
location /api/realtime {
proxy_pass http://realtime;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
location /webhook {
proxy_pass http://realtime;
}
}
Security
JWT Verification (Production)
For production, use a proper JWT library:
// Using 'jose' npm package
import { jwtVerify, SignJWT } from "npm:jose@^5.0.0";
const secret = new TextEncoder().encode(JWT_SECRET);
async function verifyToken(token: string) {
try {
const { payload } = await jwtVerify(token, secret);
return payload;
} catch {
return null;
}
}
Rate Limiting
const rateLimiter = new Map<string, number[]>();
function checkRateLimit(ip: string, limit = 100, windowMs = 60000): boolean {
const now = Date.now();
const requests = (rateLimiter.get(ip) || []).filter((t) => now - t < windowMs);
if (requests.length >= limit) return false;
requests.push(now);
rateLimiter.set(ip, requests);
return true;
}
Monitoring
if (url.pathname === "/health") {
return new Response(JSON.stringify({
status: "ok",
clients: clients.size,
collections: eventStore.size,
redis: await redis.ping() === "PONG",
}), { headers: { "Content-Type": "application/json" } });
}
Multi-Region Setup
US Region:
- Redis Cluster (US)
- Realtime Service x3
- Load Balancer
EU Region:
- Redis Cluster (EU)
- Realtime Service x3
- Load Balancer
Cross-region sync via Redis replication or Kafka
Best Practices
- Use same JWT secret as VSKI master for token compatibility
- Enable webhook JWT in trigger configuration
- Use Redis Cluster for production high availability
- Deploy close to users for low latency
- Monitor Redis memory for event storage