Consumers
A consumer is a Nest provider class that handles jobs for a specific task contract. You write it the same way you'd write any @Injectable() service, with full constructor DI — the only addition is the @TaskConsumer(contract) decorator and a process(data, ctx) method.
Basic consumer
import { TaskConsumer } from "@taskora/nestjs"
import type { InferInput, InferOutput, Taskora } from "taskora"
import { MailerService } from "./mailer.service"
import { sendEmailTask } from "@/tasks"
@TaskConsumer(sendEmailTask)
export class SendEmailConsumer {
constructor(private readonly mailer: MailerService) {}
async process(
data: InferInput<typeof sendEmailTask>,
ctx: Taskora.Context,
): Promise<InferOutput<typeof sendEmailTask>> {
ctx.log.info("sending", { to: data.to })
return this.mailer.send(data)
}
}Register it as a normal Nest provider:
@Module({
providers: [SendEmailConsumer, MailerService],
})
export class EmailModule {}That's everything. On onApplicationBootstrap, the TaskoraExplorer walks the DI graph, finds this class, calls app.implement(sendEmailTask, (data, ctx) => instance.process(data, ctx)), and then starts the app. Injected dependencies (MailerService here) stay live across every job run — you get one consumer instance for the lifetime of the process, exactly like every other @Injectable().
Why InferInput<typeof contract> and not a hand-written type?
defineTask stores the input/output schema on the contract value. InferInput<typeof sendEmailTask> reads the type directly off that value, so renaming or reshaping the schema updates the consumer's signature automatically. You could write data: { to: string; subject: string } by hand, but it drifts — one day someone adds cc?: string[] to the schema and the consumer quietly ignores it.
@TaskConsumer options
@TaskConsumer(sendEmailTask, {
concurrency: 10, // parallel in-flight jobs for this task
timeout: "30s", // aborts the handler + fails the job
retry: {
attempts: 5,
backoff: "exponential",
delay: 1000,
maxDelay: 60_000,
},
singleton: false, // "only one in-flight at a time globally"
concurrencyLimit: undefined, // key-based rate limit — see dispatch options
ttl: { max: "10m", onExpire: "fail" },
stall: { interval: 30_000, maxCount: 1 },
version: 2, // payload version for schema migrations
since: 1, // oldest supported version
app: "secondary", // multi-app routing — see below
})These are the same fields app.implement(contract, handler, options) accepts in bare taskora, minus the worker-side-only handler, onCancel, middleware, and migrate fields (which arrive via different DI mechanisms — see the sections below).
Multi-app routing
@TaskConsumer(contract, { app: 'secondary' }) binds the consumer to a specific named app registered via TaskoraModule.forRoot({ name: 'secondary' }). The explorer filters consumers by the app option, so each named app only picks up its own consumers.
@TaskConsumer(sendEmailTask) // → default app
class DefaultEmailConsumer {}
@TaskConsumer(sendEmailTask, { app: "secondary" }) // → "secondary" app
class SecondaryEmailConsumer {}Both consumers handle sendEmailTask, but they run in isolated apps with independent Redis connections. A dispatch via TaskoraRef.for(sendEmailTask) reaches the default consumer; a dispatch via @InjectTaskoraRef('secondary') → .for(sendEmailTask) reaches the secondary.
See Deployment > Multi-app for the full pattern.
Using ctx inside process()
The second argument to process() is taskora's Context — a per-job handle with progress, logs, retry helpers, and cancellation:
async process(data: InferInput<typeof processImageTask>, ctx: Taskora.Context) {
ctx.log.info("starting", { url: data.url })
await ctx.progress({ phase: "downloading" })
const buffer = await fetch(data.url).then((r) => r.arrayBuffer())
await ctx.progress({ phase: "transforming", percent: 30 })
if (ctx.signal.aborted) throw ctx.signal.reason // fail fast on cancel
const result = await this.pipeline.transform(buffer, ctx.signal)
await ctx.progress({ phase: "uploading", percent: 90 })
const url = await this.storage.upload(result)
ctx.log.info("done", { url })
return { url, width: result.width, height: result.height }
}Key fields:
ctx.log.info/warn/error— structured logs stored against the job, visible in the inspector and the board.ctx.progress(value)— number or object, surfaced in the board and viainspector.find().ctx.signal—AbortSignalthat fires on cancellation / timeout / stall recovery. Pass it tofetch, child processes, or manualawaitloops so handlers exit promptly.ctx.retry({ delay?, reason? })— returns aRetryErroryou canthrowto reschedule the job immediately (bypassingretryOn/noRetryOnfilters).ctx.attempt— current attempt number (1-indexed).
See the main Job Context guide for the full surface.
Schema validation
If the contract has a Standard Schema (Zod, Valibot, ArkType) attached via defineTask({ input, output }), taskora runs validation on both sides automatically:
- Producer:
TaskoraRef.for(contract).dispatch(data)validatesdataagainstinputbefore enqueueing (can be disabled globally viavalidateOnDispatch: falseor per-call viadispatch(data, { skipValidation: true })). - Worker: after deserialization and migration, taskora validates
dataagainstinputagain before callingprocess(). If the contract also has anoutputschema, the value returned fromprocess()is validated before being stored as the result.
Validation errors are regular ValidationError throws — they go through the normal retry machinery and end up in the DLQ if attempts is exhausted. You don't need to validate inside process() — by the time it runs, data is already typed and proven to match the schema.
@OnTaskEvent — event bindings
Method-level decorator that wires a consumer method to a per-task event on the same Task that the consumer handles:
import { OnTaskEvent, TaskConsumer } from "@taskora/nestjs"
@TaskConsumer(sendEmailTask)
export class SendEmailConsumer {
constructor(
private readonly mailer: MailerService,
private readonly metrics: MetricsService,
) {}
async process(data: InferInput<typeof sendEmailTask>) {
return this.mailer.send(data)
}
@OnTaskEvent("completed")
onDone(evt: Taskora.TaskEventMap<InferOutput<typeof sendEmailTask>>["completed"]) {
this.metrics.counter("email.sent").inc()
this.metrics.histogram("email.duration_ms").observe(Number(evt.duration))
}
@OnTaskEvent("failed")
onFail(evt: Taskora.TaskEventMap<never>["failed"]) {
this.metrics.counter("email.failed", { reason: evt.error }).inc()
}
@OnTaskEvent("retrying")
onRetry(evt: Taskora.TaskEventMap<never>["retrying"]) {
this.metrics.counter("email.retried").inc()
}
}Valid event names match the keys of Taskora.TaskEventMap:
| Event | Payload | When |
|---|---|---|
completed | { result, duration, attempt } | Handler returned successfully |
failed | { error, attempt, willRetry } | Handler threw (with or without retry) |
retrying | { error, attempt, nextAttemptAt } | Handler threw and is being retried |
progress | { value, timestamp } | Handler called ctx.progress(...) |
active | { jobId, attempt } | Job transitioned to active state |
stalled | { count, action } | Stall detection recovered or failed a job |
cancelled | { reason, cancelledAt } | handle.cancel() was called |
DI still works in event handlers
@OnTaskEvent methods run on the same consumer instance as process(). The explorer binds each method via instance[method].bind(instance), so this.metrics and this.mailer resolve normally inside event handlers — there's no detached context, no separate instance per event.
App-level events
@OnTaskEvent only wires per-task events. For cross-task app events (worker:ready, worker:error, task:completed across all tasks), inject the raw App and subscribe manually from a dedicated service:
import { Injectable, OnModuleInit } from "@nestjs/common"
import { InjectApp } from "@taskora/nestjs"
import type { App } from "taskora"
@Injectable()
export class WorkerHealthService implements OnModuleInit {
constructor(@InjectApp() private readonly app: App) {}
onModuleInit() {
this.app.on("worker:ready", () => this.markHealthy())
this.app.on("worker:error", (err) => this.reportError(err))
}
}Lifecycle ordering
The explorer runs its discovery pass inside onApplicationBootstrap. The sequence for every named app is:
- Nest resolves all providers (constructors run, DI is wired).
- Explorer walks providers via
DiscoveryService.getProviders(). - For each
@TaskConsumer, explorer callsapp.implement(contract, handler, options). - For each
@OnTaskEventmethod on that consumer, explorer callstask.on(event, boundHandler). - Explorer calls
app.start()(unlessautoStart: false). - Worker loop begins pulling jobs.
The ordering is guaranteed: all implement() calls finish before start() runs. You can't get a race where a worker picks up a job for a contract whose handler hasn't been attached yet.
On onApplicationShutdown the explorer awaits app.close(), which drains in-flight jobs up to the stall timeout and then returns. Make sure app.enableShutdownHooks() is called in main.ts so SIGTERM triggers the shutdown sequence.
Consumer-owned scheduling
A consumer can declare a schedule via the explorer's option passthrough — but actually, TaskConsumer options do not accept schedule at the moment. Use app.schedule() from a separate OnModuleInit provider or define the schedule at the task contract level via defineTask. Schedule discovery on consumers is tracked for a future release — see the Scheduling docs for the direct-taskora path.
Error handling inside process()
Throw normally. taskora's retry machinery handles everything:
async process(data: InferInput<typeof importTask>) {
try {
return await this.importer.run(data)
} catch (err) {
if (err instanceof TransientError) {
// Let taskora retry per the consumer's retry config
throw err
}
if (err instanceof PermanentError) {
// Opt out of retries — goes to DLQ immediately
throw new RetryError({ retry: false, reason: err.message })
}
throw err // default — respects retry config
}
}- Regular throws go through the
retry.attempts/backoff/retryOn/noRetryOnmachinery. ctx.retry({ delay, reason })returns aRetryErrorthat bypasses filters — always retries, with the given delay.new RetryError({ retry: false })short-circuits the retry machinery entirely, sending the job to the DLQ on first failure.TimeoutError(when the consumer'stimeoutfires) is not retried by default — add"TimeoutError"toretryOnif you want it retried.
Testing consumers
Consumers are regular Nest providers, so @nestjs/testing's Test.createTestingModule just works. For end-to-end coverage that actually runs the handler, use @taskora/nestjs/testing:
import { createTaskoraTestHarness } from "@taskora/nestjs/testing"
it("sends the welcome email", async () => {
const harness = await createTaskoraTestHarness({
providers: [SendEmailConsumer, MailerService],
})
const result = await harness.execute(sendEmailTask, {
to: "alice@example.com",
subject: "Welcome",
})
expect(result.state).toBe("completed")
expect(result.result?.messageId).toBeDefined()
await harness.close()
})See the Testing page for the full patterns — the harness runs the real explorer + real consumers + real DI in-memory.