Skip to content

Pipelines

Pipelines run multi-phase workflows where each phase produces artifacts that flow into later phases. Phases can run sequentially or in parallel, with automatic retries and session forking for isolation.

Quick Example

typescript
import { createClient, Pipeline } from "swarmlord";

const client = createClient();

const sprint = Pipeline.define({
    name: "product-sprint",
    phases: [
        {
            name: "research",
            prompt: vars => `Research the market for ${vars.idea}. Produce a competitive analysis.`,
            outputs: ["market-research.md"],
        },
        {
            name: "design",
            prompt: "Based on market-research.md, design the product. Create a spec.",
            outputs: ["product-spec.md"],
        },
        {
            name: "reviews",
            parallel: [
                {
                    prompt: "Review product-spec.md from a technical perspective.",
                    outputs: ["tech-review.md"],
                },
                {
                    prompt: "Review product-spec.md from a business perspective.",
                    outputs: ["biz-review.md"],
                },
            ],
        },
    ],
});

const result = await sprint.run(client.agent("my-agent"), {
    vars: { idea: "AI-powered recipe app" },
    on: {
        phaseStart: p => console.log(`\n▶ ${p.name}`),
        phaseComplete: (p, r) => console.log(`  ${r.status} (${(r.durationMs / 1000).toFixed(0)}s)`),
        text: (_p, delta) => process.stdout.write(delta),
    },
});

console.log(`\n\n${result.report()}`);

Defining a Pipeline

Pipeline.define(options)

Returns a reusable PipelineDef that can be .run() multiple times.

OptionTypeDefaultDescription
namestringPipeline identifier (used in reports and checkpoints).
preamblestringText prepended to every phase prompt (shared context).
retriesnumber2Default retry count for all phases.
onFailure"stop" | "continue""continue"Whether to halt the pipeline on a phase failure.
maxConcurrencynumber5Max parallel sessions across the pipeline.
phasesPhaseSpec[]Ordered list of sequential and parallel phases.

Sequential Phases

A sequential phase runs one task and produces one or more output files:

typescript
{
    name: "research",
    prompt: (vars) => `Research ${vars.topic} and write findings to market-research.md`,
    outputs: ["market-research.md"],     // Files the phase must produce
    skills: ["web-research"],            // Skills to pre-load (optional)
    model: "anthropic/claude-sonnet-4.6", // Model override (optional)
    agent: "researcher",                 // Agent entry to use (optional)
    retries: 3,                          // Override default retries (optional)
    persist: true,                       // Keep session alive for forking (optional)
    fork: "research",                    // Fork from a persisted phase (optional)
}

Parallel Phases

A parallel phase runs multiple tasks concurrently. Each task gets its own session:

typescript
{
    name: "reviews",
    parallel: [
        {
            prompt: "Technical review of the codebase",
            outputs: ["tech-review.md"],
            agent: "eng-reviewer",
        },
        {
            prompt: "Security audit of the codebase",
            outputs: ["security-audit.md"],
            agent: "security",
        },
        {
            prompt: "QA testing — write and run tests",
            outputs: ["qa-report.md"],
            agent: "qa",
        },
    ],
}

Parallel phases also support fork?: string to fork each parallel task from a persisted phase's session, giving every task the same starting workspace state:

typescript
{
    name: "reviews",
    fork: "design",   // Each parallel task forks from the persisted "design" session
    parallel: [
        { prompt: "Technical review", outputs: ["tech-review.md"] },
        { prompt: "Security audit", outputs: ["security-audit.md"] },
    ],
}

Running a Pipeline

.run(agent, options)

typescript
const result = await pipeline.run(agent, {
    vars: { idea: "Recipe app", audience: "home cooks" },
    on: {
        /* callbacks */
    },
});
OptionTypeDescription
varsRecord<string, string>Variables passed to function prompts via (vars) => string.
onPipelineCallbacksEvent callbacks for progress monitoring.

Variables in Prompts

Variables are only available to function prompts. String prompts are static and do not receive vars. Use a function prompt (vars) => string to interpolate variables:

typescript
phases: [
    {
        name: "research",
        prompt: (vars) => `Research the market for ${vars.idea} targeting ${vars.audience}`,
        outputs: ["research.md"],
    },
],
// ...
vars: { idea: "Recipe app", audience: "home cooks" },

Static string prompts are fine when no variables are needed:

typescript
{
    name: "summarize",
    prompt: "Summarize all findings into a single report.",
    outputs: ["summary.md"],
}

Artifact Flow

Artifacts accumulate across phases. Each new session is seeded with all artifacts produced by prior phases:

  1. Phase 1 produces market-research.md → written to workspace.
  2. Phase 2 starts with market-research.md already in its workspace, produces product-spec.md.
  3. Phase 3 (parallel) — each parallel task starts with both market-research.md and product-spec.md.

The pipeline engine handles this by forking sessions (carrying over workspace state) and attaching new files as message attachments when needed.

Callbacks

Monitor pipeline progress with event callbacks:

typescript
const result = await pipeline.run(agent, {
    vars: { idea: "Recipe app" },
    on: {
        phaseStart(phase) {
            console.log(`▶ ${phase.name} (${phase.type})`);
        },
        phaseComplete(phase, result) {
            console.log(`  ${result.status} — ${result.artifacts.join(", ")}`);
        },
        text(phase, delta) {
            process.stdout.write(delta);
        },
        toolStart(phase, tool) {
            process.stdout.write(`\n  [${tool.tool}] `);
        },
        toolComplete(phase, tool) {
            process.stdout.write("done");
        },
        error(phase, err) {
            console.error(`\n  Error in ${phase.name}: ${err.message}`);
        },
        checkpoint(cp) {
            // Save checkpoint for resume capability
            saveCheckpoint(cp);
        },
    },
});

Callback Types

CallbackArgumentsWhen
phaseStart(phase: PhaseInfo)Phase begins
phaseComplete(phase: PhaseInfo, result: PhaseResult)Phase finishes
text(phase: PhaseInfo, delta: string)Text token streamed
toolStart(phase: PhaseInfo, tool: ToolPart)Tool call begins
toolComplete(phase: PhaseInfo, tool: ToolPart)Tool call finishes
toolError(phase: PhaseInfo, tool: ToolPart, error: string)Tool call fails
error(phase: PhaseInfo, err: Error)Phase-level error
checkpoint(cp: PipelineCheckpoint)After each phase — save for resume
disconnect(phase: PhaseInfo)SSE stream disconnected (reconnecting)

Results

typescript
const result = await pipeline.run(agent, { vars });

result.status; // "pass" | "partial" | "fail"
result.phases; // Per-phase results
result.artifacts; // Map<string, string> — all produced files
result.durationMs; // Total wall clock time
result.cost; // { llm, container, total }
result.report(); // Markdown summary report

Phase Result

typescript
interface PhaseResult {
    name: string;
    status: "pass" | "partial" | "fail" | "skipped";
    durationMs: number;
    artifacts: string[]; // Files successfully harvested
    error?: string; // Error message if failed
    attempts: number; // Total attempts including retries
    sessionIds: string[]; // Session IDs used
}

Resume from Checkpoint

Pipelines emit checkpoints after each phase. Save them to resume after a failure:

typescript
const def = Pipeline.define({
    /* ... */
});

// First run — save checkpoints
let lastCheckpoint;
const result = await def.run(agent, {
    vars: { idea: "Recipe app" },
    on: {
        checkpoint: cp => {
            lastCheckpoint = cp;
        },
    },
});

// Resume from where it left off
if (result.status !== "pass" && lastCheckpoint) {
    const resumed = await def.resume(agent, lastCheckpoint, {
        on: {
            phaseStart: p => console.log(`▶ ${p.name}`),
            text: (_p, delta) => process.stdout.write(delta),
        },
    });
    console.log(resumed.report());
}

Config-Based Pipelines

Pipelines can also be defined declaratively in swarmlord.jsonc:

jsonc
{
    "name": "my-agent",
    "model": "anthropic/claude-haiku-4.5",

    "pipeline": {
        "research-sprint": {
            "name": "research-sprint",
            "phases": [
                {
                    "name": "research",
                    "prompt": "Research the market",
                    "outputs": ["market-research.md"],
                },
                {
                    "name": "analysis",
                    "prompt": "Analyze findings",
                    "outputs": ["analysis.md"],
                },
            ],
        },
    },
}

Config-based pipelines use static prompt strings (no functions) and are useful for declarative, repeatable workflows.