parallelTasks
parallelTasks is a single-pass fan-out/fan-in pattern. It decomposes a goal into sub-tasks, runs a worker block for each concurrently (via taskBoard), and synthesizes the results. No review loop, no replanning — one pass.
Use it when:
- You have a goal that decomposes into parallel, independent sub-tasks
- You trust the worker to produce usable results without review
- Speed matters and you can tolerate skipping failed sub-tasks
If you need results reviewed and revised before merging, use Supervisor instead.
coordinatorcoordinator() still works but emits a deprecation warning. Replace it with parallelTasks() — same config shape.
See Migration from coordinator below.
Block composition
goal
→ planner (decompose into sub-tasks)
→ seedTasks (seed taskBoard collection)
→ board.block (drain — run worker for each task concurrently)
→ collectResults (gather completed task outputs)
→ synthesizer (merge/combine)
The planner is utility.decomposer by default. The synthesizer is utility.combiner. Both are swappable via config.
Basic usage
import { parallelTasks } from "@flow-state-dev/patterns";
import { handler } from "@flow-state-dev/core";
import { z } from "zod";
const researchWorker = handler({
name: "research-task",
inputSchema: z.any(), // receives TaskWorkerInput: { taskId, goal, input, ... }
outputSchema: z.object({ summary: z.string() }),
execute: async (input) => {
// input.goal is the sub-task goal string
return { summary: `Findings for: ${input.goal}` };
},
});
const researchBlock = parallelTasks({
name: "research",
worker: researchWorker,
maxConcurrency: 5,
});
parallelTasks returns a sequencer. Use it in a flow like any other block:
import { defineFlow } from "@flow-state-dev/core";
import { z } from "zod";
const flow = defineFlow({
kind: "research",
requireUser: true,
actions: {
research: {
inputSchema: z.object({ goal: z.string() }),
block: researchBlock,
userMessage: (input) => input.goal,
},
},
session: {
stateSchema: z.object({}),
},
});
Config reference
parallelTasks({
name: string;
// The worker block that processes each sub-task.
// Receives TaskWorkerInput: { taskId, goal, input, attempts, feedback, metadata }.
worker: BlockDefinition;
// Max concurrent sub-tasks. Default: 3.
maxConcurrency?: number;
// Override the planning step.
// Must accept { goal: string } and output { tasks: Array<{ goal: string }> }.
// Default: utility.decomposer()
planner?: BlockDefinition;
// Override the synthesis step. Receives unknown[] of completed task outputs.
// Default: utility.combiner()
synthesizer?: BlockDefinition;
// Deprecated alias for synthesizer. Kept for backward compatibility.
merger?: BlockDefinition;
// How to handle individual sub-task failures:
// "skip" — exclude failed sub-tasks from synthesis (default)
// "fail" — abort the entire coordination on any failure
// "retry" — treated as "skip" with a construction-time warning
onSubTaskError?: "skip" | "fail" | "retry";
// Output schema for the synthesized result.
// Passed to the default combiner when no custom synthesizer is provided.
outputSchema?: ZodSchema;
});
Input schema
parallelTasks expects:
{ goal: string }
Exported as parallelTasksInputSchema:
import { parallelTasksInputSchema } from "@flow-state-dev/patterns";
Exported API
import {
parallelTasks,
parallelTasksInputSchema,
} from "@flow-state-dev/patterns";
import type {
ParallelTasksConfig,
SubTaskErrorStrategy, // "skip" | "fail" | "retry"
} from "@flow-state-dev/patterns";
Custom planner
The default planner is utility.decomposer. Swap it for a domain-specific one:
import { parallelTasks } from "@flow-state-dev/patterns";
import { generator } from "@flow-state-dev/core";
import { z } from "zod";
const domainPlanner = generator({
name: "domain-planner",
outputSchema: z.object({
tasks: z.array(z.object({ goal: z.string() })),
}),
prompt: "You are a planner specialized in software architecture reviews.",
user: (input) => input.goal,
});
const architectureBlock = parallelTasks({
name: "arch-review",
worker: reviewWorker,
planner: domainPlanner,
});
Custom synthesizer
By default, utility.combiner merges worker results deterministically (no LLM call). To synthesize results with an LLM, swap in utility.synthesizer:
import { parallelTasks } from "@flow-state-dev/patterns";
import { utility } from "@flow-state-dev/core";
const reportBlock = parallelTasks({
name: "report",
worker: sectionWorker,
synthesizer: utility.synthesizer({
name: "report-synthesizer",
outputSchema: z.object({ report: z.string() }),
}),
});
Error handling
By default (onSubTaskError: "skip"), failed sub-tasks are excluded from the synthesis step. The block completes with whatever results succeeded. If all sub-tasks fail, the synthesizer receives an empty array.
With onSubTaskError: "fail", any sub-task failure throws and aborts the entire coordination.
onSubTaskError: "retry" is not supported and behaves as "skip" with a one-time construction warning.
Composability
parallelTasks returns a sequencer, so it composes with other sequencer steps:
// Chain sequentially
const pipeline = sequencer({ name: "full-pipeline", inputSchema })
.then(parallelTasks({ name: "research", worker: researchWorker }))
.then(parallelTasks({ name: "synthesis", worker: synthesisWorker }));
// Use as a step inside another sequencer
const outer = sequencer({ name: "outer", inputSchema })
.then(preprocess)
.then(parallelTasks({ name: "parallel-work", worker: taskWorker }))
.then(postprocess);
Migration from coordinator
coordinator() still works as a deprecation-warned alias. Replace it with parallelTasks() — same config shape.
// Before
import { coordinator } from "@flow-state-dev/patterns";
const block = coordinator({ name: "research", worker: researchWorker });
// After
import { parallelTasks } from "@flow-state-dev/patterns";
const block = parallelTasks({ name: "research", worker: researchWorker });
One behavioral difference: worker input changed. The old coordinator passed a plain string (the task goal). parallelTasks passes TaskWorkerInput from taskBoard: { taskId, goal, input, attempts, feedback, metadata }. Workers that typed their input as z.string() need updating to z.any() or taskWorkerInputSchema.
See also
- Supervisor — same fan-out model, adds a quality review loop
- Plan and Execute — sequential dependency-ordered execution
- Patterns Overview — when to use which pattern