Pipelines
Pipelines run multi-phase workflows where each phase produces artifacts that flow into later phases. Phases can run sequentially or in parallel, with automatic retries and session forking for isolation.
Quick Example
typescript
import { createClient, Pipeline } from "swarmlord";
const client = createClient();
const sprint = Pipeline.define({
name: "product-sprint",
phases: [
{
name: "research",
prompt: vars => `Research the market for ${vars.idea}. Produce a competitive analysis.`,
outputs: ["market-research.md"],
},
{
name: "design",
prompt: "Based on market-research.md, design the product. Create a spec.",
outputs: ["product-spec.md"],
},
{
name: "reviews",
parallel: [
{
prompt: "Review product-spec.md from a technical perspective.",
outputs: ["tech-review.md"],
},
{
prompt: "Review product-spec.md from a business perspective.",
outputs: ["biz-review.md"],
},
],
},
],
});
const result = await sprint.run(client.agent("my-agent"), {
vars: { idea: "AI-powered recipe app" },
on: {
phaseStart: p => console.log(`\n▶ ${p.name}`),
phaseComplete: (p, r) => console.log(` ${r.status} (${(r.durationMs / 1000).toFixed(0)}s)`),
text: (_p, delta) => process.stdout.write(delta),
},
});
console.log(`\n\n${result.report()}`);Defining a Pipeline
Pipeline.define(options)
Returns a reusable PipelineDef that can be .run() multiple times.
| Option | Type | Default | Description |
|---|---|---|---|
name | string | — | Pipeline identifier (used in reports and checkpoints). |
preamble | string | — | Text prepended to every phase prompt (shared context). |
retries | number | 2 | Default retry count for all phases. |
onFailure | "stop" | "continue" | "continue" | Whether to halt the pipeline on a phase failure. |
maxConcurrency | number | 5 | Max parallel sessions across the pipeline. |
phases | PhaseSpec[] | — | Ordered list of sequential and parallel phases. |
Sequential Phases
A sequential phase runs one task and produces one or more output files:
typescript
{
name: "research",
prompt: (vars) => `Research ${vars.topic} and write findings to market-research.md`,
outputs: ["market-research.md"], // Files the phase must produce
skills: ["web-research"], // Skills to pre-load (optional)
model: "anthropic/claude-sonnet-4.6", // Model override (optional)
agent: "researcher", // Agent entry to use (optional)
retries: 3, // Override default retries (optional)
persist: true, // Keep session alive for forking (optional)
fork: "research", // Fork from a persisted phase (optional)
}Parallel Phases
A parallel phase runs multiple tasks concurrently. Each task gets its own session:
typescript
{
name: "reviews",
parallel: [
{
prompt: "Technical review of the codebase",
outputs: ["tech-review.md"],
agent: "eng-reviewer",
},
{
prompt: "Security audit of the codebase",
outputs: ["security-audit.md"],
agent: "security",
},
{
prompt: "QA testing — write and run tests",
outputs: ["qa-report.md"],
agent: "qa",
},
],
}Parallel phases also support fork?: string to fork each parallel task from a persisted phase's session, giving every task the same starting workspace state:
typescript
{
name: "reviews",
fork: "design", // Each parallel task forks from the persisted "design" session
parallel: [
{ prompt: "Technical review", outputs: ["tech-review.md"] },
{ prompt: "Security audit", outputs: ["security-audit.md"] },
],
}Running a Pipeline
.run(agent, options)
typescript
const result = await pipeline.run(agent, {
vars: { idea: "Recipe app", audience: "home cooks" },
on: {
/* callbacks */
},
});| Option | Type | Description |
|---|---|---|
vars | Record<string, string> | Variables passed to function prompts via (vars) => string. |
on | PipelineCallbacks | Event callbacks for progress monitoring. |
Variables in Prompts
Variables are only available to function prompts. String prompts are static and do not receive vars. Use a function prompt (vars) => string to interpolate variables:
typescript
phases: [
{
name: "research",
prompt: (vars) => `Research the market for ${vars.idea} targeting ${vars.audience}`,
outputs: ["research.md"],
},
],
// ...
vars: { idea: "Recipe app", audience: "home cooks" },Static string prompts are fine when no variables are needed:
typescript
{
name: "summarize",
prompt: "Summarize all findings into a single report.",
outputs: ["summary.md"],
}Artifact Flow
Artifacts accumulate across phases. Each new session is seeded with all artifacts produced by prior phases:
- Phase 1 produces
market-research.md→ written to workspace. - Phase 2 starts with
market-research.mdalready in its workspace, producesproduct-spec.md. - Phase 3 (parallel) — each parallel task starts with both
market-research.mdandproduct-spec.md.
The pipeline engine handles this by forking sessions (carrying over workspace state) and attaching new files as message attachments when needed.
Callbacks
Monitor pipeline progress with event callbacks:
typescript
const result = await pipeline.run(agent, {
vars: { idea: "Recipe app" },
on: {
phaseStart(phase) {
console.log(`▶ ${phase.name} (${phase.type})`);
},
phaseComplete(phase, result) {
console.log(` ${result.status} — ${result.artifacts.join(", ")}`);
},
text(phase, delta) {
process.stdout.write(delta);
},
toolStart(phase, tool) {
process.stdout.write(`\n [${tool.tool}] `);
},
toolComplete(phase, tool) {
process.stdout.write("done");
},
error(phase, err) {
console.error(`\n Error in ${phase.name}: ${err.message}`);
},
checkpoint(cp) {
// Save checkpoint for resume capability
saveCheckpoint(cp);
},
},
});Callback Types
| Callback | Arguments | When |
|---|---|---|
phaseStart | (phase: PhaseInfo) | Phase begins |
phaseComplete | (phase: PhaseInfo, result: PhaseResult) | Phase finishes |
text | (phase: PhaseInfo, delta: string) | Text token streamed |
toolStart | (phase: PhaseInfo, tool: ToolPart) | Tool call begins |
toolComplete | (phase: PhaseInfo, tool: ToolPart) | Tool call finishes |
toolError | (phase: PhaseInfo, tool: ToolPart, error: string) | Tool call fails |
error | (phase: PhaseInfo, err: Error) | Phase-level error |
checkpoint | (cp: PipelineCheckpoint) | After each phase — save for resume |
disconnect | (phase: PhaseInfo) | SSE stream disconnected (reconnecting) |
Results
typescript
const result = await pipeline.run(agent, { vars });
result.status; // "pass" | "partial" | "fail"
result.phases; // Per-phase results
result.artifacts; // Map<string, string> — all produced files
result.durationMs; // Total wall clock time
result.cost; // { llm, container, total }
result.report(); // Markdown summary reportPhase Result
typescript
interface PhaseResult {
name: string;
status: "pass" | "partial" | "fail" | "skipped";
durationMs: number;
artifacts: string[]; // Files successfully harvested
error?: string; // Error message if failed
attempts: number; // Total attempts including retries
sessionIds: string[]; // Session IDs used
}Resume from Checkpoint
Pipelines emit checkpoints after each phase. Save them to resume after a failure:
typescript
const def = Pipeline.define({
/* ... */
});
// First run — save checkpoints
let lastCheckpoint;
const result = await def.run(agent, {
vars: { idea: "Recipe app" },
on: {
checkpoint: cp => {
lastCheckpoint = cp;
},
},
});
// Resume from where it left off
if (result.status !== "pass" && lastCheckpoint) {
const resumed = await def.resume(agent, lastCheckpoint, {
on: {
phaseStart: p => console.log(`▶ ${p.name}`),
text: (_p, delta) => process.stdout.write(delta),
},
});
console.log(resumed.report());
}Config-Based Pipelines
Pipelines can also be defined declaratively in swarmlord.jsonc:
jsonc
{
"name": "my-agent",
"model": "anthropic/claude-haiku-4.5",
"pipeline": {
"research-sprint": {
"name": "research-sprint",
"phases": [
{
"name": "research",
"prompt": "Research the market",
"outputs": ["market-research.md"],
},
{
"name": "analysis",
"prompt": "Analyze findings",
"outputs": ["analysis.md"],
},
],
},
},
}Config-based pipelines use static prompt strings (no functions) and are useful for declarative, repeatable workflows.