2 Workflows
Anton Nesterov edited this page 2026-02-22 22:54:39 +01:00

Workflows

Orchestrate complex business logic with distributed workflows.

Understanding Workflows

Workflows are sequences of steps that execute business logic. They run asynchronously, can recover from failures, and support parent-child relationships for complex orchestration.

Creating Workflows

Basic Workflow

import { step, workflow } from "@vski/sdk/functional";

workflow("send-welcome-email").run(async (ctx, input) => {
  const userId = input[0].userId;

  // Step 1: Fetch user data
  const user = await step("fetch-user", async () => {
    return await client.collection("_users").getOne(userId);
  })();

  // Step 2: Generate welcome email
  const email = await step("generate-email", async () => {
    return generateWelcomeEmail(user);
  })();

  // Step 3: Send email
  await step("send-email", async () => {
    return await emailService.send(email);
  })();

  return { success: true, userId };
});

// Register workflow
registerWorkflow(sendWelcomeEmail);

Starting a Worker

import { WorkflowWorker } from "@vski/sdk/worker";

const worker = new WorkflowWorker(client);

// Start worker for specific workflow
worker.start("send-welcome-email");

// Start worker for all registered workflows
worker.startAll();

Triggering Workflows

// Trigger workflow with input
const run = await client.workflow.trigger("send-welcome-email", [
  { userId: "user-123" },
]);

console.log(run.runId); // Workflow run ID
console.log(run.status); // "pending", "running", "completed", etc.

Workflow Steps

Basic Step

const result = await step("step-name", async () => {
  // Your logic here
  return { data: "result" };
})();

Steps with Parameters

const result = await step("process-data", async (data) => {
  // Process data
  return { processed: true };
}, { input: data })();

Step Error Handling

try {
  const result = await step("risky-operation", async () => {
    // Might throw error
    return await riskyOperation();
  })();
} catch (error) {
  console.error("Step failed:", error);
  // Workflow will be marked as failed
}

Workflow Configuration

Circuit Breaker

Prevent runaway workflows:

workflow("data-processing", {
  maxEvents: 100, // Maximum events
  executionTimeout: 60000, // 60 second timeout
}).run(async (ctx, input) => {
  // Workflow code
});

Custom Run ID

const run = await client.workflow.trigger("my-workflow", [input], {
  runId: "custom-run-id-123",
});

Idempotency

Prevent duplicate runs:

const run = await client.workflow.trigger("my-workflow", [input], {
  idempotencyKey: "unique-key-123",
});

Parent-Child Workflows

Creating Child Workflows

workflow("parent-workflow").run(async (ctx, input) => {
  // Trigger child workflow
  const childRun = await client.workflow.trigger("child-workflow", input, {
    parentRunId: ctx.runId,
  });

  // Wait for child to complete
  while (true) {
    const childStatus = await client.workflow.getRun(childRun.runId);
    if (childStatus.status === "completed") {
      break;
    }
    await delay(1000);
  }

  return { childRunId: childRun.runId };
});

Accessing Parent Context

workflow("child-workflow").run(async (ctx, input) => {
  // Access parent run ID
  console.log("Parent run ID:", ctx.parentRunId);

  // Child workflow logic
});

Parallel Execution

Fan-Out Pattern

Execute multiple steps in parallel using ctx.parallel():

workflow("process-items").run(async (ctx, input) => {
  const items = input[0].items;

  // Process items in parallel
  const results = await ctx.parallel(
    items.map((item) =>
      step(`process-${item.id}`, async () => {
        return processItem(item);
      })()
    ),
  );

  return { results };
});

Fan-In Pattern

Process in parallel and then aggregate results:

workflow("aggregate-results").run(async (ctx, input) => {
  const items = input[0].items;

  // Process in parallel
  const results = await ctx.parallel(
    items.map((item) =>
      step(`process-${item.id}`, async () => {
        return processItem(item);
      })()
    ),
  );

  // Aggregate results
  const summary = await step("aggregate", async () => {
    return aggregateResults(results);
  })();

  return summary;
});

Signals

Signals allow workflows to wait for external input, enabling approval workflows, human-in-the-loop patterns, and dynamic branching.

Waiting for Signals

Pause workflow execution until a signal is received:

workflow("approval-workflow").run(async (ctx, input) => {
  const amount = input[0].amount;

  // Request approval
  await step("request-expense", async () => {
    return { status: "pending", amount };
  })();

  // Wait for approval signal
  const approval = await ctx.waitForSignal<
    { approved: boolean; reason?: string }
  >("manager-approval");

  if (approval.approved) {
    await step("process-payment", async () => {
      return { paid: true };
    })();
    return { status: "approved" };
  } else {
    await step("notify-rejection", async () => {
      return { notified: true };
    })();
    return { status: "rejected", reason: approval.reason };
  }
});

Sending Signals

External systems can send signals to resume workflows:

// From external service
await client.workflow.sendSignal(runId, "manager-approval", {
  approved: true,
  comment: "Approved for processing",
});

Sleeping and Delays

Use ctx.sleep() to pause workflow execution for a specified duration:

workflow("rate-limited-workflow").run(async (ctx, input) => {
  const items = input[0].items;

  for (const item of items) {
    await step("process-item", async () => {
      return processItem(item);
    })();

    // Wait 1 second between items to respect rate limits
    await ctx.sleep("1s");
  }

  return { processed: items.length };
});

Duration formats:

  • Milliseconds: 5000 or "5000ms"
  • Seconds: "5s" or 10
  • Minutes: "10m"
  • Hours: "1h"

Workflow Hooks

Creating Approval Hooks

workflow("document-approval").run(async (ctx, input) => {
  const documentId = input[0].documentId;

  // Create approval hook
  const hook = await client.workflow.createHook(
    ctx.runId,
    "document-approval",
    "approve",
    {
      documentId,
    },
  );

  // Wait for approval
  const approval = await waitForApproval(hook.token);

  if (!approval.approved) {
    throw new Error("Document rejected");
  }

  // Continue workflow
  await step("publish-document", async () => {
    return publishDocument(documentId);
  })();

  return { published: true };
});

Executing Hooks

// External endpoint receives hook execution
await client.workflow.executeHook("hook-token", {
  approved: true,
  comment: "Approved for publication",
});

Workflow Events

Emitting Events

workflow("track-events").run(async (ctx, input) => {
  await client.workflow.emitEvent(ctx.runId, "step-started", {
    step: "process-data",
    timestamp: Date.now(),
  });

  await step("process-data", async () => {
    // Process data
  })();

  await client.workflow.emitEvent(ctx.runId, "step-completed", {
    step: "process-data",
    timestamp: Date.now(),
  });

  return { success: true };
});

Querying Events

const events = await client.workflow.getEvents("run-id");

events.forEach((event) => {
  console.log(`${event.eventType}: ${JSON.stringify(event.payload)}`);
});

Monitoring Workflows

Get Workflow Status

const run = await client.workflow.getRun("run-id");

console.log(run.status); // pending, running, completed, failed, cancelled
console.log(run.startedAt);
console.log(run.completedAt);
console.log(run.error); // Error message if failed

List Workflow Runs

const runs = await client.workflow.getRuns({
  workflowName: "send-welcome-email",
  status: "completed",
  limit: 20,
});

console.log(runs);

Get Child Workflows

const children = await client.workflow.getChildRuns("parent-run-id");

Get Parent Workflow

const parent = await client.workflow.getParentRun("child-run-id");

Canceling Workflows

await client.workflow.cancel("run-id");

Practical Examples

Data Processing Pipeline

workflow("process-user-data").run(async (ctx, input) => {
  const userId = input[0].userId;

  // Step 1: Fetch user
  const user = await step("fetch-user", async () => {
    return await client.collection("_users").getOne(userId);
  })();

  // Step 2: Enrich data
  const enriched = await step("enrich-data", async () => {
    return await externalService.enrich(user);
  })();

  // Step 3: Save enriched data
  await step("save-data", async () => {
    return await client.collection("user_profiles").update(userId, enriched);
  })();

  // Step 4: Send notification
  await step("send-notification", async () => {
    return await notificationService.send(user.email, "Profile updated");
  })();

  return { success: true };
});

Order Processing

workflow("process-order").run(async (ctx, input) => {
  const orderId = input[0].orderId;

  // Step 1: Validate order
  const order = await step("validate-order", async () => {
    return await validateOrder(orderId);
  })();

  // Step 2: Process payment
  const payment = await step("process-payment", async () => {
    return await paymentService.charge(order);
  })();

  // Step 3: Update inventory
  await step("update-inventory", async () => {
    return await inventoryService.reserve(order.items);
  })();

  // Step 4: Ship order
  await step("ship-order", async () => {
    return await shippingService.create(order);
  })();

  // Step 5: Send confirmation
  await step("send-confirmation", async () => {
    return await emailService.send(order.email, "Order confirmed");
  })();

  return { shipped: true };
});

Batch Processing

workflow("batch-process").run(async (ctx, input) => {
  const batchId = input[0].batchId;

  // Step 1: Get batch items
  const items = await step("get-batch", async () => {
    return await client.collection("batch_items").getList(1, 1000, {
      filter: `batchId = '${batchId}' && status = 'pending'`,
    });
  })();

  // Step 2: Process in parallel
  const results = await Promise.all(
    items.items.map((item) =>
      step(`process-${item.id}`, async () => {
        const result = await processItem(item);
        await client.collection("batch_items").update(item.id, {
          status: "completed",
          result,
        });
        return result;
      })()
    ),
  );

  // Step 3: Aggregate results
  const summary = await step("aggregate", async () => {
    return {
      total: items.items.length,
      succeeded: results.filter((r) => r.success).length,
      failed: results.filter((r) => !r.success).length,
    };
  })();

  // Step 4: Update batch
  await step("update-batch", async () => {
    return await client.collection("batches").update(batchId, {
      status: "completed",
      summary,
    });
  })();

  return summary;
});

Scheduled Reports

workflow("generate-report").run(async (ctx, input) => {
  const reportType = input[0].type;

  // Step 1: Fetch data
  const data = await step("fetch-data", async () => {
    return await fetchReportData(reportType);
  })();

  // Step 2: Generate report
  const report = await step("generate", async () => {
    return await reportService.generate(data, reportType);
  })();

  // Step 3: Save report
  const saved = await step("save", async () => {
    return await client.collection("reports").create({
      type: reportType,
      url: report.url,
      generatedAt: new Date().toISOString(),
    });
  })();

  // Step 4: Notify recipients
  await step("notify", async () => {
    return await notificationService.notify(saved.id);
  })();

  return { reportId: saved.id };
});

Error Handling

Retry Failed Steps

const result = await step("retryable-step", async () => {
  return await potentiallyFailingOperation();
}, {
  retries: 3, // Retry 3 times
  retryDelay: 1000, // Wait 1 second between retries
})();

Step Error Callbacks

await step("process-data", async () => {
  return await processData();
}, {
  onError: async (error) => {
    // Log error
    console.error("Step failed:", error);
    // Send alert
    await alertService.send(error);
  },
})();

Workflow-Level Error Handling

workflow("error-handling").run(async (ctx, input) => {
  try {
    await step("risky-operation", async () => {
      return await riskyOperation();
    })();
  } catch (error) {
    // Log error
    await client.workflow.emitEvent(ctx.runId, "error", {
      error: error.message,
      timestamp: Date.now(),
    });

    // Continue or rethrow
    throw error;
  }
});

Best Practices

  1. Idempotent steps - Make steps idempotent to handle retries
  2. Descriptive step names - Use clear names for debugging
  3. Proper timeouts - Set reasonable execution timeouts
  4. Error handling - Handle errors gracefully
  5. Logging - Emit events for tracking
  6. Testing - Test workflows thoroughly
  7. Monitoring - Monitor workflow execution
  8. Clean up - Clean up old workflow runs
  9. Use children - Use child workflows for complex processes
  10. Parallelize - Use parallel execution when possible

API Endpoints

Method Endpoint Description
POST /api/workflows/runs Trigger workflow
GET /api/workflows/runs/:id Get workflow run
GET /api/workflows/runs List workflow runs
PATCH /api/workflows/runs/:id Update workflow run
DELETE /api/workflows/runs/:id Cancel workflow
GET /api/workflows/runs/:id/children Get child workflows
GET /api/workflows/runs/:id/parent Get parent workflow
POST /api/workflows/events Emit event
GET /api/workflows/runs/:id/events List events
POST /api/workflows/hooks Create hook
POST /api/workflows/hooks/:token Execute hook