Durable Kafka workers with crash recovery
A worker that consumes Kafka, dispatches durable workflows for each message, and resumes cleanly after crashes — Redpanda is a drop-in replacement.
A consumer pulls messages from a Kafka topic and dispatches each one through a durable Resonate workflow. Per-message state lives in durable promises, so a worker that dies mid-batch resumes cleanly without redoing completed steps.
TypeScript: @resonatehq/sdk v0.10.1 (current). Python: resonate-sdk v0.6.x against the legacy Resonate Server. Rust: v0.1.0, in active development.
All three repos run against Redpanda for local development because the broker boots in seconds, but the code uses the Kafka wire protocol — kafkajs for TypeScript, confluent-kafka-python for Python, rdkafka for Rust. Point any of them at any Kafka-compatible broker — Apache Kafka, Redpanda, or a managed offering — by changing the bootstrap-server config.
kafkajs consumer that dispatches a durable workflow per message, with batch-level retries.
Consumer that dispatches a durable workflow per message, with batch-level retries.
rdkafka consumer that dispatches a durable workflow per message via spawn().
The problem#
A worker between two queues has two recurring problems. Head-of-line blocking — if the first message kicks off an hour-long task and the next one would take 30 seconds, the short job sits behind the long one unless the worker handles concurrency. Repeated work on crashes — if a multi-step task is half-done when the worker dies, restart usually means repeating completed steps and emitting duplicate side-effects downstream.
Most teams write per-message dedup tables, idempotency keys, and recovery scaffolding — repeatedly, and rarely well.
Resonate's solution#
Each Kafka message becomes the ID of a durable workflow. Resonate runs the workflow concurrently with whatever else is in flight, checkpoints every step, and dedups by ID — so a retry of a partially-completed message reconnects to its in-flight execution rather than starting over. When the workflow finishes, the worker emits a "done" message to the next topic.
Code walkthrough#
The pattern is a small consume loop that hands each message to workflow.begin_run. The workflow itself does the per-message work — here, deleting batches of records until none remain, then emitting a completion event.
The durable workflow#
import { Resonate, type Context } from "@resonatehq/sdk";
export const resonate = new Resonate({
url: "http://localhost:8001",
group: "workers",
});
function* workflow(ctx: Context, recordId: string, offset: string) {
console.log(`processing record ${recordId} (offset ${offset})`);
// Loop until deleteBatch reports no more rows.
while (yield* ctx.run(deleteBatch, recordId)) {
yield* ctx.sleep(5_000);
}
yield* ctx.run(enqueueCompletion, recordId, offset);
}
resonate.register("workflow", workflow);from resonate import Resonate
import json
resonate = Resonate.remote()
def delete_batch(_, record_id, batch_size=10):
# Real work goes here. Returns True if more batches remain.
print(f"deleting a batch of rows for record {record_id}")
# ...transient failures throw; durable retry handles them.
return more_remaining
def enqueue(_, msg_id, previous_offset):
# Emit a completion event to the downstream topic.
payload = json.dumps((msg_id, previous_offset)).encode("utf-8")
producer.produce("records_that_were_deleted", value=payload)
producer.flush()
@resonate.register
def workflow(ctx, record_id, offset):
print(f"processing record {record_id} (offset {offset})")
# Loop until delete_batch reports no more rows.
while (yield ctx.run(delete_batch, record_id)):
yield ctx.sleep(5)
yield ctx.run(enqueue, record_id, offset)use resonate::prelude::*;
use std::time::Duration;
#[resonate::function]
pub async fn workflow(ctx: &Context, record_id: String, offset: i64) -> Result<()> {
println!("processing record {record_id} (offset {offset})");
// Loop until delete_batch reports no more rows.
while ctx.run(delete_batch, record_id.clone()).await? {
ctx.sleep(Duration::from_secs(5)).await?;
}
ctx.run(enqueue_completion, (record_id, offset)).await?;
Ok(())
}Each ctx.run is a checkpoint. A worker crash mid-loop resumes from the next batch rather than from message zero, and delete_batch / deleteBatch doesn't run again for batches that already completed.
The Kafka consumer#
const consumer = kafka.consumer({ groupId: CONSUMER_GROUP_ID });
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_TO_DELETE, fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
const recordId = JSON.parse(message.value!.toString("utf-8")) as string;
// beginRun is non-blocking. The recordId acts as the durable promise id,
// so duplicate invocations dedupe.
await resonate.beginRun(recordId, "workflow", recordId, message.offset);
},
});def consume():
consumer.subscribe(["records_to_be_deleted"])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None or msg.error():
continue
record_id = json.loads(msg.value().decode("utf-8"))
if isinstance(record_id, str):
record_id = [record_id]
# The message ID is the workflow ID — same record reconnects
# to its in-flight execution rather than starting over.
_ = workflow.begin_run(record_id[0], record_id[0], msg.offset())
finally:
consumer.close()
def main():
resonate.start()
consume()let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", CONSUMER_GROUP_ID)
.set("bootstrap.servers", "localhost:9092")
.create()?;
consumer.subscribe(&[TOPIC_TO_DELETE])?;
while let Some(msg) = consumer.recv().await.ok() {
let record_id: String = serde_json::from_slice(msg.payload().unwrap())?;
// .spawn() is non-blocking. record_id acts as the durable promise id.
let _: ResonateHandle<()> = resonate
.run(&record_id, "workflow", (record_id.clone(), msg.offset()))
.target("poll://any@workers")
.spawn()
.await?;
}beginRun / begin_run / .spawn() is non-blocking — the consumer keeps polling while the workflow runs in the background. The same recordId deduplicates retries by design.
Run it locally#
Both repos ship a docker-compose.yml for Redpanda (Redpanda boots faster than Apache Kafka locally; the worker code is identical for both).
git clone https://github.com/resonatehq-examples/example-kafka-worker-ts
cd example-kafka-worker-ts
bun installdocker compose upbrew install resonatehq/tap/resonate
resonate devbun run consumerbun run producer -- -n 10Clone the repo (uses uv):
git clone https://github.com/resonatehq-examples/example-kafka-worker-py
cd example-kafka-worker-py
uv synccd redpanda && docker compose upbrew install resonatehq/tap/resonate
resonate serveuv run python -m record_deletor.deleteuv run python -m record_producer.produce -n 10git clone https://github.com/resonatehq-examples/example-kafka-worker-rs
cd example-kafka-worker-rs
cargo builddocker compose upbrew install resonatehq/tap/resonate
resonate devcargo run --bin consumercargo run --bin producer -- -n 10(rdkafka requires cmake at build time — install via brew install cmake if missing.)
Watch the worker terminal — it consumes each message, runs the durable workflow, and re-emits a completion event when each finishes.
Try the recovery story#
While the worker is processing a batch, kill it. Restart it. Resonate replays the workflow from the last unfinished step — already-deleted batches are not redone, the loop continues, the completion event still fires exactly once.
Related#
- Load balancing — same dispatch pattern across multiple worker instances.
- Async HTTP API endpoints — the same workflow-per-input shape, sourced from HTTP rather than Kafka.