Skip to main content

Side Chains

Side chains let you run work in the background without blocking the main pipeline. Three primitives cover the common patterns:

  • .work() / .background() — queue a single background task (.background() is an alias that reads better in fan-out contexts)
  • .workIf(condition, block) — conditional variant of .work(), dispatches only when condition is truthy
  • .forEachBackground() — dispatch each element of an array as a background task with concurrency control

Work failures never abort the pipeline. They emit step_error items instead. Use side chains for fire-and-forget side effects: logging, analytics, cache warming, notifications.

Fire-and-forget

import { handler, sequencer } from "@flow-state-dev/core";
import { z } from "zod";

const logAnalytics = handler({
name: "log-analytics",
inputSchema: z.object({ event: z.string(), payload: z.unknown() }),
outputSchema: z.undefined(),
execute: async (input) => {
await sendToAnalytics(input.event, input.payload);
},
});

const pipeline = sequencer({
name: "pipeline",
inputSchema: z.object({ message: z.string() }),
})
.then(mainBlock)
.work((output) => ({ event: "processed", payload: output }), logAnalytics)
.then(nextStep);

nextStep receives the same output as mainBlock produced. The analytics call runs in the background and doesn't block. If logAnalytics throws, the pipeline keeps going. The error becomes a step_error item.

With and without connectors

Without a connector, the work block gets the current pipeline value:

.work(logBlock)

With a connector, you reshape the payload for the work block:

.work(
(output) => ({ event: "summary_complete", text: output.text }),
summarizeAnalytics
)

The connector runs in the main thread. Only the block execution is backgrounded.

Error isolation

Work failures are isolated. The main pipeline does not throw. Instead, the framework emits a step_error item with the work task name and the error. Your client can surface these for debugging, but the user flow continues.

If you need to know whether background work succeeded, use .waitForWork().

waitForWork — convergence points

.waitForWork() waits for all .work() tasks queued so far. By default, it does not throw on work failures:

pipeline
.work(taskA)
.work(taskB)
.waitForWork()
.then(nextStep);

nextStep runs after both tasks finish. If either failed, the pipeline still continues. Set failOnError: true to promote work failures:

.waitForWork({ failOnError: true })

With failOnError: true, if any work task rejects, the sequencer throws and the pipeline aborts. Use this when background work is required for correctness (e.g., persisting a critical record) rather than optional (e.g., analytics).

failOnError option

pipeline
.work(requiredSyncTask)
.work(optionalLogTask)
.waitForWork({ failOnError: true });

If requiredSyncTask fails, the pipeline throws. If only optionalLogTask fails, the pipeline continues. The tradeoff: failOnError applies to all queued work. You can't fail only on specific tasks. If you need per-task behavior, use separate .work() / .waitForWork() segments.

When to use work vs tap

tapwork
Blocks main pipeline?YesNo
Runs in parallel with next step?NoYes
Failure affects pipeline?Yes (throws)No (step_error only)
Use caseSide effect you must complete before continuingFire-and-forget, best-effort

Use tap when the side effect must succeed before the next step. Use work when you want non-blocking, best-effort behavior.

Multiple work items

You can queue several work tasks; they run concurrently:

pipeline
.then(coreLogic)
.work(logUsage)
.work(cacheWarm)
.work(sendNotification)
.then(moreWork);

All three run in parallel. The main chain proceeds to moreWork immediately. Call .waitForWork() when you need to converge:

pipeline
.then(coreLogic)
.work(logUsage)
.work(cacheWarm)
.waitForWork()
.then(moreWork);

Realistic example

const chatPipeline = sequencer({
name: "chat",
inputSchema: z.object({ message: z.string() }),
})
.then(validateInput)
.then(agent)
.work(
(output) => ({
event: "response_generated",
sessionId: "...",
tokenCount: output.usage?.totalTokens ?? 0,
}),
analyticsHandler
)
.work(
(output) => output.suggestedFollowUps ?? [],
warmCacheHandler
)
.tap(logToJournal)
.then(formatResponse);

Analytics and cache warming run in parallel. logToJournal runs inline (tap) because we want it done before formatting. The pipeline only continues after the tap completes.

background — alias for work

.background() is identical to .work(). It exists because "background" reads more naturally when you're thinking about fan-out patterns:

pipeline
.then(mainLogic)
.background(notifySlack)
.background(warmCache)
.then(nextStep);

Use whichever name makes the call site clearer.

workIf — conditional background work

.workIf() is the conditional variant of .work(). It evaluates a condition at execution time and only dispatches the sidechain when the condition is truthy. When falsy, it's a complete no-op — no block execution, no items emitted, no cost incurred.

The canonical use case is feature-flagged background work:

const pipeline = sequencer({
name: "chat",
inputSchema: z.object({ message: z.string() }),
})
.then(agent)
.workIf(
(ctx) => ctx.session.state.features.memory,
memoryObserveBlock
)
.then(formatResponse);

When features.memory is disabled, the pipeline behaves as if the .workIf() call didn't exist. No block is dispatched, no promise is queued, no step_error can be emitted.

Static booleans

The condition also accepts a plain boolean. This is useful for compile-time feature flags:

const ENABLE_ANALYTICS = process.env.ANALYTICS === "true";

pipeline.workIf(ENABLE_ANALYTICS, analyticsBlock);

Static true is equivalent to .work(). Static false is a permanent no-op.

With a connector

Like .work(), you can reshape the input for the background block:

pipeline.workIf(
(ctx) => ctx.session.state.observeEnabled,
(output) => ({ event: "processed", data: output }),
analyticsBlock,
{ name: "conditional-analytics" }
);

When the condition is falsy, the connector is never called.

Condition signature

The condition function receives the BlockContext (not the pipeline value). This is deliberate: workIf is about checking session state, feature flags, or runtime configuration — not examining the pipeline data.

// ✅ workIf condition — receives ctx only
.workIf((ctx) => ctx.session.state.featureEnabled, block)

// ✅ thenIf condition — receives both input and ctx
.thenIf((input, ctx) => input.score > 0.5, block)

Async conditions

The condition can be async. It's evaluated once before dispatching:

pipeline.workIf(
async (ctx) => {
const settings = await loadFeatureFlags(ctx.session.state.userId);
return settings.memoryEnabled;
},
memoryObserveBlock
);

forEachBackground — fan-out over arrays

.forEachBackground() dispatches each element of an array to a block as background work. The parent continues immediately. Each iteration runs independently — one failing doesn't stop the others or abort the pipeline.

const notifySubscriber = handler({
name: "notify-subscriber",
inputSchema: z.object({ userId: z.string(), message: z.string() }),
outputSchema: z.undefined(),
execute: async (input) => {
await sendPush(input.userId, input.message);
},
});

const pipeline = sequencer({
name: "broadcast",
inputSchema: z.object({
subscribers: z.array(z.object({ userId: z.string(), message: z.string() })),
}),
})
.map((input) => input.subscribers)
.forEachBackground(notifySubscriber, { concurrency: 8 });

The pipeline's output is the original array, not the block results. This is a fundamental difference from .forEach(), which blocks and returns an array of outputs.

With a connector:

pipeline.forEachBackground(
(input) => input.channels.map((ch) => ({ channel: ch, payload: input.data })),
broadcastBlock,
{ concurrency: 4 }
);

forEach vs forEachBackground

forEachforEachBackground
TimingBlocks until all iterations completeDispatches and continues immediately
Return typeT[] (array of block outputs)Pass-through (original input)
FailureAny iteration aborts the parentIsolated per iteration
Use caseTransform a collectionBroadcast, fan-out, cache warming

Concurrency

The concurrency option (default: 16) limits how many iterations run simultaneously. This prevents overwhelming downstream services when fanning out over large arrays:

.forEachBackground(notifyBlock, { concurrency: 4 })

Cancellation

Parent flow cancellation propagates to in-flight background iterations via the abort signal. The worker loop checks the signal before starting each new iteration.