No results
2
Workflows
Anton Nesterov edited this page 2026-02-22 22:54:39 +01:00
Table of Contents
Workflows
Orchestrate complex business logic with distributed workflows.
Understanding Workflows
Workflows are sequences of steps that execute business logic. They run asynchronously, can recover from failures, and support parent-child relationships for complex orchestration.
Creating Workflows
Basic Workflow
import { step, workflow } from "@vski/sdk/functional";
workflow("send-welcome-email").run(async (ctx, input) => {
const userId = input[0].userId;
// Step 1: Fetch user data
const user = await step("fetch-user", async () => {
return await client.collection("_users").getOne(userId);
})();
// Step 2: Generate welcome email
const email = await step("generate-email", async () => {
return generateWelcomeEmail(user);
})();
// Step 3: Send email
await step("send-email", async () => {
return await emailService.send(email);
})();
return { success: true, userId };
});
// Register workflow
registerWorkflow(sendWelcomeEmail);
Starting a Worker
import { WorkflowWorker } from "@vski/sdk/worker";
const worker = new WorkflowWorker(client);
// Start worker for specific workflow
worker.start("send-welcome-email");
// Start worker for all registered workflows
worker.startAll();
Triggering Workflows
// Trigger workflow with input
const run = await client.workflow.trigger("send-welcome-email", [
{ userId: "user-123" },
]);
console.log(run.runId); // Workflow run ID
console.log(run.status); // "pending", "running", "completed", etc.
Workflow Steps
Basic Step
const result = await step("step-name", async () => {
// Your logic here
return { data: "result" };
})();
Steps with Parameters
const result = await step("process-data", async (data) => {
// Process data
return { processed: true };
}, { input: data })();
Step Error Handling
try {
const result = await step("risky-operation", async () => {
// Might throw error
return await riskyOperation();
})();
} catch (error) {
console.error("Step failed:", error);
// Workflow will be marked as failed
}
Workflow Configuration
Circuit Breaker
Prevent runaway workflows:
workflow("data-processing", {
maxEvents: 100, // Maximum events
executionTimeout: 60000, // 60 second timeout
}).run(async (ctx, input) => {
// Workflow code
});
Custom Run ID
const run = await client.workflow.trigger("my-workflow", [input], {
runId: "custom-run-id-123",
});
Idempotency
Prevent duplicate runs:
const run = await client.workflow.trigger("my-workflow", [input], {
idempotencyKey: "unique-key-123",
});
Parent-Child Workflows
Creating Child Workflows
workflow("parent-workflow").run(async (ctx, input) => {
// Trigger child workflow
const childRun = await client.workflow.trigger("child-workflow", input, {
parentRunId: ctx.runId,
});
// Wait for child to complete
while (true) {
const childStatus = await client.workflow.getRun(childRun.runId);
if (childStatus.status === "completed") {
break;
}
await delay(1000);
}
return { childRunId: childRun.runId };
});
Accessing Parent Context
workflow("child-workflow").run(async (ctx, input) => {
// Access parent run ID
console.log("Parent run ID:", ctx.parentRunId);
// Child workflow logic
});
Parallel Execution
Fan-Out Pattern
Execute multiple steps in parallel using ctx.parallel():
workflow("process-items").run(async (ctx, input) => {
const items = input[0].items;
// Process items in parallel
const results = await ctx.parallel(
items.map((item) =>
step(`process-${item.id}`, async () => {
return processItem(item);
})()
),
);
return { results };
});
Fan-In Pattern
Process in parallel and then aggregate results:
workflow("aggregate-results").run(async (ctx, input) => {
const items = input[0].items;
// Process in parallel
const results = await ctx.parallel(
items.map((item) =>
step(`process-${item.id}`, async () => {
return processItem(item);
})()
),
);
// Aggregate results
const summary = await step("aggregate", async () => {
return aggregateResults(results);
})();
return summary;
});
Signals
Signals allow workflows to wait for external input, enabling approval workflows, human-in-the-loop patterns, and dynamic branching.
Waiting for Signals
Pause workflow execution until a signal is received:
workflow("approval-workflow").run(async (ctx, input) => {
const amount = input[0].amount;
// Request approval
await step("request-expense", async () => {
return { status: "pending", amount };
})();
// Wait for approval signal
const approval = await ctx.waitForSignal<
{ approved: boolean; reason?: string }
>("manager-approval");
if (approval.approved) {
await step("process-payment", async () => {
return { paid: true };
})();
return { status: "approved" };
} else {
await step("notify-rejection", async () => {
return { notified: true };
})();
return { status: "rejected", reason: approval.reason };
}
});
Sending Signals
External systems can send signals to resume workflows:
// From external service
await client.workflow.sendSignal(runId, "manager-approval", {
approved: true,
comment: "Approved for processing",
});
Sleeping and Delays
Use ctx.sleep() to pause workflow execution for a specified duration:
workflow("rate-limited-workflow").run(async (ctx, input) => {
const items = input[0].items;
for (const item of items) {
await step("process-item", async () => {
return processItem(item);
})();
// Wait 1 second between items to respect rate limits
await ctx.sleep("1s");
}
return { processed: items.length };
});
Duration formats:
- Milliseconds:
5000or"5000ms" - Seconds:
"5s"or10 - Minutes:
"10m" - Hours:
"1h"
Workflow Hooks
Creating Approval Hooks
workflow("document-approval").run(async (ctx, input) => {
const documentId = input[0].documentId;
// Create approval hook
const hook = await client.workflow.createHook(
ctx.runId,
"document-approval",
"approve",
{
documentId,
},
);
// Wait for approval
const approval = await waitForApproval(hook.token);
if (!approval.approved) {
throw new Error("Document rejected");
}
// Continue workflow
await step("publish-document", async () => {
return publishDocument(documentId);
})();
return { published: true };
});
Executing Hooks
// External endpoint receives hook execution
await client.workflow.executeHook("hook-token", {
approved: true,
comment: "Approved for publication",
});
Workflow Events
Emitting Events
workflow("track-events").run(async (ctx, input) => {
await client.workflow.emitEvent(ctx.runId, "step-started", {
step: "process-data",
timestamp: Date.now(),
});
await step("process-data", async () => {
// Process data
})();
await client.workflow.emitEvent(ctx.runId, "step-completed", {
step: "process-data",
timestamp: Date.now(),
});
return { success: true };
});
Querying Events
const events = await client.workflow.getEvents("run-id");
events.forEach((event) => {
console.log(`${event.eventType}: ${JSON.stringify(event.payload)}`);
});
Monitoring Workflows
Get Workflow Status
const run = await client.workflow.getRun("run-id");
console.log(run.status); // pending, running, completed, failed, cancelled
console.log(run.startedAt);
console.log(run.completedAt);
console.log(run.error); // Error message if failed
List Workflow Runs
const runs = await client.workflow.getRuns({
workflowName: "send-welcome-email",
status: "completed",
limit: 20,
});
console.log(runs);
Get Child Workflows
const children = await client.workflow.getChildRuns("parent-run-id");
Get Parent Workflow
const parent = await client.workflow.getParentRun("child-run-id");
Canceling Workflows
await client.workflow.cancel("run-id");
Practical Examples
Data Processing Pipeline
workflow("process-user-data").run(async (ctx, input) => {
const userId = input[0].userId;
// Step 1: Fetch user
const user = await step("fetch-user", async () => {
return await client.collection("_users").getOne(userId);
})();
// Step 2: Enrich data
const enriched = await step("enrich-data", async () => {
return await externalService.enrich(user);
})();
// Step 3: Save enriched data
await step("save-data", async () => {
return await client.collection("user_profiles").update(userId, enriched);
})();
// Step 4: Send notification
await step("send-notification", async () => {
return await notificationService.send(user.email, "Profile updated");
})();
return { success: true };
});
Order Processing
workflow("process-order").run(async (ctx, input) => {
const orderId = input[0].orderId;
// Step 1: Validate order
const order = await step("validate-order", async () => {
return await validateOrder(orderId);
})();
// Step 2: Process payment
const payment = await step("process-payment", async () => {
return await paymentService.charge(order);
})();
// Step 3: Update inventory
await step("update-inventory", async () => {
return await inventoryService.reserve(order.items);
})();
// Step 4: Ship order
await step("ship-order", async () => {
return await shippingService.create(order);
})();
// Step 5: Send confirmation
await step("send-confirmation", async () => {
return await emailService.send(order.email, "Order confirmed");
})();
return { shipped: true };
});
Batch Processing
workflow("batch-process").run(async (ctx, input) => {
const batchId = input[0].batchId;
// Step 1: Get batch items
const items = await step("get-batch", async () => {
return await client.collection("batch_items").getList(1, 1000, {
filter: `batchId = '${batchId}' && status = 'pending'`,
});
})();
// Step 2: Process in parallel
const results = await Promise.all(
items.items.map((item) =>
step(`process-${item.id}`, async () => {
const result = await processItem(item);
await client.collection("batch_items").update(item.id, {
status: "completed",
result,
});
return result;
})()
),
);
// Step 3: Aggregate results
const summary = await step("aggregate", async () => {
return {
total: items.items.length,
succeeded: results.filter((r) => r.success).length,
failed: results.filter((r) => !r.success).length,
};
})();
// Step 4: Update batch
await step("update-batch", async () => {
return await client.collection("batches").update(batchId, {
status: "completed",
summary,
});
})();
return summary;
});
Scheduled Reports
workflow("generate-report").run(async (ctx, input) => {
const reportType = input[0].type;
// Step 1: Fetch data
const data = await step("fetch-data", async () => {
return await fetchReportData(reportType);
})();
// Step 2: Generate report
const report = await step("generate", async () => {
return await reportService.generate(data, reportType);
})();
// Step 3: Save report
const saved = await step("save", async () => {
return await client.collection("reports").create({
type: reportType,
url: report.url,
generatedAt: new Date().toISOString(),
});
})();
// Step 4: Notify recipients
await step("notify", async () => {
return await notificationService.notify(saved.id);
})();
return { reportId: saved.id };
});
Error Handling
Retry Failed Steps
const result = await step("retryable-step", async () => {
return await potentiallyFailingOperation();
}, {
retries: 3, // Retry 3 times
retryDelay: 1000, // Wait 1 second between retries
})();
Step Error Callbacks
await step("process-data", async () => {
return await processData();
}, {
onError: async (error) => {
// Log error
console.error("Step failed:", error);
// Send alert
await alertService.send(error);
},
})();
Workflow-Level Error Handling
workflow("error-handling").run(async (ctx, input) => {
try {
await step("risky-operation", async () => {
return await riskyOperation();
})();
} catch (error) {
// Log error
await client.workflow.emitEvent(ctx.runId, "error", {
error: error.message,
timestamp: Date.now(),
});
// Continue or rethrow
throw error;
}
});
Best Practices
- Idempotent steps - Make steps idempotent to handle retries
- Descriptive step names - Use clear names for debugging
- Proper timeouts - Set reasonable execution timeouts
- Error handling - Handle errors gracefully
- Logging - Emit events for tracking
- Testing - Test workflows thoroughly
- Monitoring - Monitor workflow execution
- Clean up - Clean up old workflow runs
- Use children - Use child workflows for complex processes
- Parallelize - Use parallel execution when possible
API Endpoints
| Method | Endpoint | Description |
|---|---|---|
POST |
/api/workflows/runs |
Trigger workflow |
GET |
/api/workflows/runs/:id |
Get workflow run |
GET |
/api/workflows/runs |
List workflow runs |
PATCH |
/api/workflows/runs/:id |
Update workflow run |
DELETE |
/api/workflows/runs/:id |
Cancel workflow |
GET |
/api/workflows/runs/:id/children |
Get child workflows |
GET |
/api/workflows/runs/:id/parent |
Get parent workflow |
POST |
/api/workflows/events |
Emit event |
GET |
/api/workflows/runs/:id/events |
List events |
POST |
/api/workflows/hooks |
Create hook |
POST |
/api/workflows/hooks/:token |
Execute hook |