Skip to main content

parallelTasks

parallelTasks is a single-pass fan-out/fan-in pattern. It decomposes a goal into sub-tasks, runs a worker block for each concurrently (via taskBoard), and synthesizes the results. No review loop, no replanning — one pass.

Use it when:

  • You have a goal that decomposes into parallel, independent sub-tasks
  • You trust the worker to produce usable results without review
  • Speed matters and you can tolerate skipping failed sub-tasks

If you need results reviewed and revised before merging, use Supervisor instead.

Migration from coordinator

coordinator() still works but emits a deprecation warning. Replace it with parallelTasks() — same config shape. See Migration from coordinator below.

Block composition

goal
→ planner (decompose into sub-tasks)
→ seedTasks (seed taskBoard collection)
→ board.block (drain — run worker for each task concurrently)
→ collectResults (gather completed task outputs)
→ synthesizer (merge/combine)

The planner is utility.decomposer by default. The synthesizer is utility.combiner. Both are swappable via config.

Basic usage

import { parallelTasks } from "@flow-state-dev/patterns";
import { handler } from "@flow-state-dev/core";
import { z } from "zod";

const researchWorker = handler({
name: "research-task",
inputSchema: z.any(), // receives TaskWorkerInput: { taskId, goal, input, ... }
outputSchema: z.object({ summary: z.string() }),
execute: async (input) => {
// input.goal is the sub-task goal string
return { summary: `Findings for: ${input.goal}` };
},
});

const researchBlock = parallelTasks({
name: "research",
worker: researchWorker,
maxConcurrency: 5,
});

parallelTasks returns a sequencer. Use it in a flow like any other block:

import { defineFlow } from "@flow-state-dev/core";
import { z } from "zod";

const flow = defineFlow({
kind: "research",
requireUser: true,
actions: {
research: {
inputSchema: z.object({ goal: z.string() }),
block: researchBlock,
userMessage: (input) => input.goal,
},
},
session: {
stateSchema: z.object({}),
},
});

Config reference

parallelTasks({
name: string;

// The worker block that processes each sub-task.
// Receives TaskWorkerInput: { taskId, goal, input, attempts, feedback, metadata }.
worker: BlockDefinition;

// Max concurrent sub-tasks. Default: 3.
maxConcurrency?: number;

// Override the planning step.
// Must accept { goal: string } and output { tasks: Array<{ goal: string }> }.
// Default: utility.decomposer()
planner?: BlockDefinition;

// Override the synthesis step. Receives unknown[] of completed task outputs.
// Default: utility.combiner()
synthesizer?: BlockDefinition;

// Deprecated alias for synthesizer. Kept for backward compatibility.
merger?: BlockDefinition;

// How to handle individual sub-task failures:
// "skip" — exclude failed sub-tasks from synthesis (default)
// "fail" — abort the entire coordination on any failure
// "retry" — treated as "skip" with a construction-time warning
onSubTaskError?: "skip" | "fail" | "retry";

// Output schema for the synthesized result.
// Passed to the default combiner when no custom synthesizer is provided.
outputSchema?: ZodSchema;
});

Input schema

parallelTasks expects:

{ goal: string }

Exported as parallelTasksInputSchema:

import { parallelTasksInputSchema } from "@flow-state-dev/patterns";

Exported API

import {
parallelTasks,
parallelTasksInputSchema,
} from "@flow-state-dev/patterns";

import type {
ParallelTasksConfig,
SubTaskErrorStrategy, // "skip" | "fail" | "retry"
} from "@flow-state-dev/patterns";

Custom planner

The default planner is utility.decomposer. Swap it for a domain-specific one:

import { parallelTasks } from "@flow-state-dev/patterns";
import { generator } from "@flow-state-dev/core";
import { z } from "zod";

const domainPlanner = generator({
name: "domain-planner",
outputSchema: z.object({
tasks: z.array(z.object({ goal: z.string() })),
}),
prompt: "You are a planner specialized in software architecture reviews.",
user: (input) => input.goal,
});

const architectureBlock = parallelTasks({
name: "arch-review",
worker: reviewWorker,
planner: domainPlanner,
});

Custom synthesizer

By default, utility.combiner merges worker results deterministically (no LLM call). To synthesize results with an LLM, swap in utility.synthesizer:

import { parallelTasks } from "@flow-state-dev/patterns";
import { utility } from "@flow-state-dev/core";

const reportBlock = parallelTasks({
name: "report",
worker: sectionWorker,
synthesizer: utility.synthesizer({
name: "report-synthesizer",
outputSchema: z.object({ report: z.string() }),
}),
});

Error handling

By default (onSubTaskError: "skip"), failed sub-tasks are excluded from the synthesis step. The block completes with whatever results succeeded. If all sub-tasks fail, the synthesizer receives an empty array.

With onSubTaskError: "fail", any sub-task failure throws and aborts the entire coordination.

onSubTaskError: "retry" is not supported and behaves as "skip" with a one-time construction warning.

Composability

parallelTasks returns a sequencer, so it composes with other sequencer steps:

// Chain sequentially
const pipeline = sequencer({ name: "full-pipeline", inputSchema })
.then(parallelTasks({ name: "research", worker: researchWorker }))
.then(parallelTasks({ name: "synthesis", worker: synthesisWorker }));

// Use as a step inside another sequencer
const outer = sequencer({ name: "outer", inputSchema })
.then(preprocess)
.then(parallelTasks({ name: "parallel-work", worker: taskWorker }))
.then(postprocess);

Migration from coordinator

coordinator() still works as a deprecation-warned alias. Replace it with parallelTasks() — same config shape.

// Before
import { coordinator } from "@flow-state-dev/patterns";
const block = coordinator({ name: "research", worker: researchWorker });

// After
import { parallelTasks } from "@flow-state-dev/patterns";
const block = parallelTasks({ name: "research", worker: researchWorker });

One behavioral difference: worker input changed. The old coordinator passed a plain string (the task goal). parallelTasks passes TaskWorkerInput from taskBoard: { taskId, goal, input, attempts, feedback, metadata }. Workers that typed their input as z.string() need updating to z.any() or taskWorkerInputSchema.

See also