Skip to content

Tasks

A task is a named function with configuration. Taskora provides three ways to define tasks, from minimal to fully configured.

Minimal Definition

Pass a name and a handler. Types are inferred automatically.

ts
const greetTask = taskora.task("greet", async (data: { name: string }) => {
  return `Hello, ${data.name}!`
})

// TypeScript knows: dispatch expects { name: string }, result is string
const handle = greetTask.dispatch({ name: "Alice" })
const result = await handle.result // "Hello, Alice!"

With Options

Pass an options object for retry, timeout, concurrency, and more.

ts
const sendEmailTask = taskora.task("send-email", {
  concurrency: 5,
  timeout: 30_000,
  retry: {
    attempts: 3,
    backoff: "exponential",
    delay: 1000,
  },
  handler: async (data: { to: string; subject: string }, ctx) => {
    ctx.log.info("Sending email", { to: data.to })
    return await mailer.send(data)
  },
})

With Schema Validation

Use any Standard Schema compatible library (Zod, Valibot, ArkType) for runtime validation.

ts
import { z } from "zod"

const processOrderTask = taskora.task("process-order", {
  input: z.object({
    orderId: z.string().uuid(),
    items: z.array(z.object({
      sku: z.string(),
      quantity: z.number().int().positive(),
    })),
  }),
  output: z.object({
    total: z.number(),
    status: z.enum(["confirmed", "pending"]),
  }),
  handler: async (data, ctx) => {
    // data is fully typed: { orderId: string, items: { sku: string, quantity: number }[] }
    const total = await calculateTotal(data.items)
    return { total, status: "confirmed" as const }
  },
})

Schema validation runs after migrations (if versioned) and provides clear ValidationError with an issues array on failure.

Reusing the inferred types

Reach for InferInput / InferOutput when you need the types outside the handler — in a controller, a test factory, a shared DTO, anywhere:

ts
import type { InferInput, InferOutput } from "taskora"

type OrderPayload = InferInput<typeof processOrderTask>
// { orderId: string; items: { sku: string; quantity: number }[] }

type OrderResult = InferOutput<typeof processOrderTask>
// { total: number; status: "confirmed" | "pending" }

They also work on BoundTask (from contracts), ResultHandle, WorkflowHandle, and workflow Signatures. Collisions with a schema library that also ships InferInput? Use the namespaced form — Taskora.InferInput<typeof processOrderTask> — see Contracts → Type inference helpers for the full list of supported carriers.

Task Options Reference

OptionTypeDefaultDescription
concurrencynumber1Max parallel jobs per worker
timeoutnumberundefinedHandler timeout in ms
retryRetryConfigundefinedRetry configuration
stallStallConfig{ interval: 30000, maxCount: 1 }Stall detection config
singletonbooleanfalseOnly one active job at a time
concurrencyLimitnumberundefinedMax active jobs per concurrency key
ttlTtlConfigundefinedJob time-to-live
middlewareMiddleware[][]Per-task middleware
onCancel(data, ctx) => voidundefinedCleanup on cancellation
versionnumber1Current task version
sincenumber1Minimum supported version
migrateMigrationFn[] | RecordundefinedVersion migrations
inputStandardSchemaundefinedInput validation schema
outputStandardSchemaundefinedOutput validation schema
scheduleScheduleConfigundefinedInline schedule
collectCollectConfigundefinedBatch collection

Collect Tasks

Collect tasks accumulate items into batches before processing. The handler receives an array.

ts
const batchInsertTask = taskora.task("batch-insert", {
  collect: {
    key: "db-inserts",
    delay: "2s",        // flush 2s after last item
    maxSize: 100,       // or when 100 items accumulated
    maxWait: "10s",     // or 10s since first item (hard deadline)
  },
  handler: async (items: { table: string; row: Record<string, unknown> }[], ctx) => {
    ctx.log.info(`Inserting ${items.length} rows`)
    await db.batchInsert(items)
    return { inserted: items.length }
  },
})

// Dispatch individual items — they accumulate automatically
batchInsertTask.dispatch({ table: "users", row: { name: "Alice" } })
batchInsertTask.dispatch({ table: "users", row: { name: "Bob" } })

Three flush triggers (whichever fires first):

  1. Debounce delay — reset on each new item
  2. maxSize — immediate flush when buffer is full
  3. maxWait — hard deadline since first item after last flush

Released under the MIT License.