Workflows
Compose tasks into pipelines — sequential chains, parallel groups, and fan-in chords. Inspired by Celery's Canvas, with full TypeScript type safety.
Signatures
A Signature is a snapshot of a task invocation — serializable and composable.
const sig = sendEmailTask.s({ to: "a@b.com", subject: "Welcome" })
// Type: Signature<{ to: string; subject: string }, { messageId: string }>Two forms:
| Call | Behavior |
|---|---|
task.s(data) | Bound data — ignores pipeline input |
task.s() | Unbound — receives previous step's output |
Chain
Sequential pipeline. Each step's output flows as input to the next. TypeScript checks the entire chain at compile time.
import { chain } from "taskora"
const onboarding = chain(
createUserTask.s({ name: "John", email: "john@example.com" }),
// ^ returns { id: string }
sendWelcomeEmailTask.s(),
// ^ receives { id: string }, returns { messageId: string }
notifySlackTask.s(),
// ^ receives { messageId: string }
)
const handle = onboarding.dispatch()
const result = await handle.resultPipe Syntax
Fluent alternative with unlimited type-safe chaining:
const result = await createUserTask
.s({ name: "John", email: "john@example.com" })
.pipe(sendWelcomeEmailTask.s())
.pipe(notifySlackTask.s())
.dispatch()
.resultchain() provides type overloads for up to 10 steps. .pipe() has no limit — each call is individually type-checked.
Group
Parallel execution. All signatures run concurrently, result is a typed tuple.
import { group } from "taskora"
const handle = group(
processImageTask.s({ url: "img1.jpg", width: 800 }),
processImageTask.s({ url: "img2.jpg", width: 800 }),
processImageTask.s({ url: "img3.jpg", width: 800 }),
).dispatch()
const result = await handle.result
// Type: [ImageResult, ImageResult, ImageResult]Chord
Group + callback — parallel execution, then merge. The callback receives an array of all group results.
import { chord } from "taskora"
const handle = chord(
[
fetchPriceTask.s({ symbol: "AAPL" }),
fetchPriceTask.s({ symbol: "GOOG" }),
fetchPriceTask.s({ symbol: "MSFT" }),
],
calculatePortfolioTask.s(),
// ^ receives [PriceResult, PriceResult, PriceResult]
).dispatch()Composability
Compositions are themselves valid inputs to other compositions:
const handle = chord(
[
chain(fetchDataTask.s({ source: "api" }), transformTask.s()),
chain(fetchDataTask.s({ source: "db" }), transformTask.s()),
],
mergeTask.s(),
).dispatch()Groups work as chain steps too:
const handle = chain(
fetchConfigTask.s({ env: "prod" }),
group(buildFrontendTask.s(), buildBackendTask.s()),
// ^ fans out config to both, collects results
deployTask.s(),
// ^ receives [FrontendResult, BackendResult]
).dispatch()Map & Chunk
Batch operations on a single task.
Map
Dispatch one job per item, all in parallel:
const handle = processImageTask.map([
{ url: "img1.jpg", width: 800 },
{ url: "img2.jpg", width: 800 },
{ url: "img3.jpg", width: 800 },
])
const results = await handle.result
// [ImageResult, ImageResult, ImageResult]Equivalent to group(task.s(item1), task.s(item2), ...).dispatch().
Chunk
Split into batches, process each batch as a parallel group, batches run sequentially:
const handle = processImageTask.chunk(largeImageList, { size: 50 })
// Processes 50 at a time, then next 50, etc.WorkflowHandle
All compositions return a WorkflowHandle on dispatch:
const handle = chain(a.s(data), b.s()).dispatch()
await handle // ensure dispatched (thenable)
const result = await handle.result // wait for final result
const state = await handle.getState() // "running" | "completed" | "failed" | "cancelled"
await handle.cancel({ reason: "no longer needed" }) // cascade cancelWorkflow TTL
Set a timeout on the entire workflow:
const handle = chain(a.s(data), b.s(), c.s()).dispatch({
ttl: "5m", // auto-cancel if not completed within 5 minutes
})Individual jobs still use their task-level TTL. The workflow TTL is an additional global timeout.
How It Works
All compositions flatten to a DAG (directed acyclic graph) of task nodes:
chain(a, b, c) → a → b → c
group(a, b, c) → a, b, c (all parallel)
chord([a, b], c) → a ─┐
b ─┤→ c
chord([chain(a,b), chain(c,d)], e) → a → b ─┐
c → d ─┤→ eAt dispatch:
- DAG is built, job IDs pre-generated for all nodes
- Workflow state is stored in Redis as a single hash
- Root nodes (no dependencies) are enqueued immediately
- When a job completes, the worker advances the workflow — finds ready nodes and enqueues them
- When all terminal nodes complete, the workflow is done
Failures cascade: if any node fails permanently (no retries left), the entire workflow is marked failed and all active/pending nodes are cancelled.
Bound Data vs Pipeline
Taskora uses a full-bind-or-pipe model (not partial application):
task.s(data)— data is fixed, pipeline input is ignoredtask.s()— receives entire previous step's output as input
The first step in a chain must have bound data. Subsequent steps can either bind their own data (ignoring the pipeline) or receive from the previous step.
This keeps type checking clean: each chain junction is a single constraint (PrevOutput extends NextInput).
Testing Workflows
The test runner supports workflows out of the box:
import { createTestRunner } from "taskora/test"
import { chain } from "taskora"
const runner = createTestRunner()
const addTask = runner.app.task("add", async (data: { x: number; y: number }) => data.x + data.y)
const doubleTask = runner.app.task("double", async (n: number) => n * 2)
const handle = chain(addTask.s({ x: 3, y: 4 }), doubleTask.s()).dispatch()
await handle
// Process all workflow steps
for (let i = 0; i < 10; i++) {
await runner.processAll()
if (await handle.getState() === "completed") break
}
const result = await handle.result // 14
// Track step execution
console.log(runner.steps)
// [
// { workflowId: "...", nodeIndex: 0, taskName: "add", state: "completed" },
// { workflowId: "...", nodeIndex: 1, taskName: "double", state: "completed" },
// ]