Durable Kafka workers with crash recovery
A worker that consumes Kafka, dispatches durable workflows for each message, and resumes cleanly after crashes — Redpanda is a drop-in replacement.
A consumer pulls messages from a Kafka topic and dispatches each one through a durable Resonate workflow. Per-message state lives in durable promises, so a worker that dies mid-batch resumes cleanly without redoing completed steps.
TypeScript: @resonatehq/sdk v0.10.2 (current). Python: resonate-sdk v0.7.0 (current). Rust: 0.4.0, in active development.
All three repos run against Redpanda for local development because the broker boots in seconds, but the code uses the Kafka wire protocol — kafkajs for TypeScript, confluent-kafka-python for Python, rdkafka for Rust. Point any of them at any Kafka-compatible broker — Apache Kafka, Redpanda, or a managed offering — by changing the bootstrap-server config.
kafkajs consumer that dispatches a durable workflow per message, with batch-level retries.
Consumer that dispatches a durable workflow per message, with batch-level retries.
rdkafka consumer that dispatches a durable workflow per message via spawn().
The problem#
A worker between two queues has two recurring problems. Head-of-line blocking — if the first message kicks off an hour-long task and the next one would take 30 seconds, the short job sits behind the long one unless the worker handles concurrency. Repeated work on crashes — if a multi-step task is half-done when the worker dies, restart usually means repeating completed steps and emitting duplicate side-effects downstream.
Most teams write per-message dedup tables, idempotency keys, and recovery scaffolding — repeatedly, and rarely well.
Resonate's solution#
Each Kafka message becomes the ID of a durable workflow. Resonate runs the workflow concurrently with whatever else is in flight, checkpoints every step, and dedups by ID — so a retry of a partially-completed message reconnects to its in-flight execution rather than starting over. When the workflow finishes, the worker emits a "done" message to the next topic.
Code walkthrough#
The pattern is a small consume loop that hands each message to a durable workflow run. The workflow itself does the per-message work — here, deleting batches of records until none remain, then emitting a completion event.
The durable workflow#
import { Resonate, type Context } from "@resonatehq/sdk";
export const resonate = new Resonate({
url: "http://localhost:8001",
group: "workers",
});
function* workflow(ctx: Context, recordId: string, offset: string) {
console.log(`processing record ${recordId} (offset ${offset})`);
// Loop until deleteBatch reports no more rows.
while (yield* ctx.run(deleteBatch, recordId)) {
yield* ctx.sleep(5_000);
}
yield* ctx.run(enqueueCompletion, recordId, offset);
}
resonate.register("workflow", workflow);from __future__ import annotations
import asyncio
import json
import os
from datetime import timedelta
from typing import TYPE_CHECKING
from resonate.resonate import Resonate
if TYPE_CHECKING:
from resonate.context import Context
url = os.environ.get("RESONATE_URL", "http://localhost:8001")
resonate = Resonate(url=url)
async def delete_batch(ctx: Context, record_id: str, batch_size: int = 10) -> bool:
# Real work goes here. Returns True if more batches remain.
print(f"deleting a batch of rows for record {record_id}")
# ...transient failures throw; durable retry handles them.
return more_remaining
async def enqueue(ctx: Context, msg_id: str, previous_offset: int) -> None:
# Emit a completion event to the downstream topic.
payload = json.dumps((msg_id, previous_offset)).encode("utf-8")
producer.produce("records_that_were_deleted", value=payload)
producer.flush()
async def workflow(ctx: Context, record_id: str, offset: int) -> None:
print(f"processing record {record_id} (offset {offset})")
# Loop until delete_batch reports no more rows.
while await ctx.run(delete_batch, record_id):
await ctx.sleep(timedelta(seconds=5))
await ctx.run(enqueue, record_id, offset)
resonate.register(workflow)use resonate::prelude::*;
use std::time::Duration;
#[resonate::function]
pub async fn workflow(ctx: &Context, record_id: String, offset: i64) -> Result<()> {
println!("processing record {record_id} (offset {offset})");
// Loop until delete_batch reports no more rows.
while ctx.run(delete_batch, record_id.clone()).await? {
ctx.sleep(Duration::from_secs(5)).await?;
}
ctx.run(enqueue_completion, (record_id, offset)).await?;
Ok(())
}Each ctx.run is a checkpoint. A worker crash mid-loop resumes from the next batch rather than from message zero, and delete_batch / deleteBatch doesn't run again for batches that already completed.
The Kafka consumer#
const consumer = kafka.consumer({ groupId: CONSUMER_GROUP_ID });
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_TO_DELETE, fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
const recordId = JSON.parse(message.value!.toString("utf-8")) as string;
// beginRun is non-blocking. The recordId acts as the durable promise id,
// so duplicate invocations dedupe.
await resonate.beginRun(recordId, "workflow", recordId, message.offset);
},
});def consume():
consumer.subscribe(["records_to_be_deleted"])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None or msg.error():
continue
record_id = json.loads(msg.value().decode("utf-8"))
if isinstance(record_id, str):
record_id = [record_id]
# The message ID is the workflow ID — same record reconnects
# to its in-flight execution rather than starting over.
resonate.run(record_id[0], workflow, record_id[0], msg.offset())
finally:
consumer.close()let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", CONSUMER_GROUP_ID)
.set("bootstrap.servers", "localhost:9092")
.create()?;
consumer.subscribe(&[TOPIC_TO_DELETE])?;
while let Some(msg) = consumer.recv().await.ok() {
let record_id: String = serde_json::from_slice(msg.payload().unwrap())?;
// .spawn() is non-blocking. record_id acts as the durable promise id.
let _: ResonateHandle<()> = resonate
.run(&record_id, "workflow", (record_id.clone(), msg.offset()))
.target("poll://any@workers")
.spawn()
.await?;
}beginRun / resonate.run(...) / .spawn() is non-blocking — the consumer keeps polling while the workflow runs in the background. The same recordId deduplicates retries by design.
Run it locally#
Both repos ship a docker-compose.yml for Redpanda (Redpanda boots faster than Apache Kafka locally; the worker code is identical for both).
git clone https://github.com/resonatehq-examples/example-kafka-worker-ts
cd example-kafka-worker-ts
bun installdocker compose upbrew install resonatehq/tap/resonate
resonate devbun run consumerbun run producer -- -n 10Clone the repo (uses uv):
git clone https://github.com/resonatehq-examples/example-kafka-worker-py
cd example-kafka-worker-py
uv synccd redpanda && docker compose upbrew install resonatehq/tap/resonate
resonate devuv run python -m record_deletor.deleteuv run python -m record_producer.produce -n 10git clone https://github.com/resonatehq-examples/example-kafka-worker-rs
cd example-kafka-worker-rs
cargo builddocker compose upbrew install resonatehq/tap/resonate
resonate devcargo run --bin consumercargo run --bin producer -- -n 10(rdkafka requires cmake at build time — install via brew install cmake if missing.)
Watch the worker terminal — it consumes each message, runs the durable workflow, and re-emits a completion event when each finishes.
Try the recovery story#
While the worker is processing a batch, kill it. Restart it. Resonate replays the workflow from the last unfinished step — already-deleted batches are not redone, the loop continues, the completion event still fires exactly once.
Related#
- Load balancing — same dispatch pattern across multiple worker instances.
- Async HTTP API endpoints — the same workflow-per-input shape, sourced from HTTP rather than Kafka.