TL;DR: Adding Kafka consumers to a Flink job triggers a rebalance that can desynchronize checkpoints and break exactly-once guarantees. Coordinate scaling with checkpoint-aligned offsets, transactional producers, and a savepoint-driven rollout to keep state and output in lockstep.
Key Takeaways - Flink’s exactly-once relies on stable partition ownership; a rebalance shatters that stability. - Transactional Kafka producers bind output records to checkpoint success, preventing drift. - A savepoint-based, staged rollout lets you add consumers without losing state or throughput.
Why Adding Kafka Consumers Often Breaks Flink's Exactly-Once Guarantees

Adding Kafka consumers often seems like a quick throughput boost, but it can silently break exactly-once guarantees. The first symptom is a subtle data loss that appears weeks later, when a downstream aggregation suddenly skips records. The root cause isn’t a Flink bug; it’s Kafka’s rebalance protocol.
When a new consumer joins a group, Kafka reassigns partitions across all members. Flink’s `FlinkKafkaConsumer` treats each partition as a source subtask. Its exactly-once model expects a subtask to own the same set of partitions for the whole lifetime of a checkpoint. A rebalance moves a partition from one subtask to another. The checkpoint taken before the move no longer reflects the true input source. The downstream state operator continues to apply updates based on stale offsets, while the new subtask starts from the latest committed offset. This creates a gap between Flink’s internal state and the external sink.
Three concrete failure modes illustrate the problem: - Stale offset checkpoint - The checkpoint metadata points to an offset that the new subtask will never read. - In-flight records - Records emitted before the rebalance remain in the pipeline, but their offsets are not part of the checkpoint. - Duplicate writes - The transactional sink may receive the same logical record twice when the job recovers, violating exactly-once.
The issue worsens with higher parallelism because more subtasks mean more partitions to shuffle, and each shuffle opens a window where checkpoints are out of sync. Teams often see duplicate events in the sink or missing aggregates in real-time dashboards.
What hidden mechanism in Kafka’s rebalance creates this drift?
The Hidden Mechanics of Partition Rebalancing and State Drift
Kafka’s rebalance unfolds in three phases: prepare-rebalance, revoke, and assign.
- prepare-rebalance - The coordinator notifies all members that a change is imminent. Each consumer must pause its fetch loop.
- revoke - Members receive a callback to clean up state, usually by committing offsets.
- assign - New partition assignments are delivered, and consumers resume fetching.
Flink ties offset commits to its checkpoint barrier. If a rebalance occurs between two barriers, the source may have already emitted records that belong to the upcoming checkpoint, but the checkpoint metadata still points to the old offsets. When the new consumer instance receives the partition, it starts from the latest committed offset, which can be behind the records already in flight.
The snippet below registers a rebalance listener that logs each phase. Logging helps you see exactly when a rebalance interferes with a checkpoint.
1consumer.setConsumerRebalanceListener(new ConsumerRebalanceListener() {2 @Override3 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {4 System.out.println("Rebalance revoke: " + partitions);5 }67 @Override8 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {9 System.out.println("Rebalance assign: " + partitions);10 }11});
If the log shows a `revoked` event just before a checkpoint barrier, a drift window has opened. Consumer lag widens that window. When the group lags behind the topic head, the rebalance adds further delay because the new member must catch up before processing resumes. Meanwhile, Flink’s checkpoint continues to advance, sealing state that no longer matches the input stream.
The net effect is state drift: the state backend (RocksDB or heap) reflects a different view of the world than the Kafka sink. Downstream systems that rely on exactly-once semantics - financial ledgers, fraud detectors, inventory tallies - receive a mismatched picture.
So how can you keep the state in sync?
How Flink's State Backend and Transactional Kafka Producers Preserve Consistency
Flink isolates drift by persisting state in a backend that survives crashes and rebalances. RocksDB stores key-value pairs on disk, while the heap backend keeps them in memory. Both backends are checkpointed to durable storage (S3, GCS, HDFS) at the same barrier that triggers offset commits. The checkpoint includes exactly the offsets that were read up to that point, ensuring that a restore will replay from the same spot.
The real guardrail is the transactional Kafka producer. By enabling `enable.idempotence=true` and setting a `transaction.timeout.ms` that exceeds the checkpoint interval, Flink can open a transaction at the start of a checkpoint, write all output records, then commit the transaction only when the checkpoint succeeds. If the checkpoint fails - perhaps because a rebalance moved a partition - the transaction aborts, and no records reach the sink. This atomicity binds the state snapshot and the external side effects.
Below is a minimal `producer.properties` file that turns on idempotence and transactions:
1enable.idempotence=true2transactional.id=flink-job-${job.id}3acks=all4retries=55# transaction.timeout.ms should be longer than the checkpoint interval
The `transactional.id` must be unique per Flink job instance. The timeout must be longer than the longest expected checkpoint interval, otherwise the transaction may expire mid-checkpoint.
When the job recovers from a savepoint or a failed checkpoint, the producer starts a fresh transaction, guaranteeing that no duplicate or missing records appear.
But how do you enforce this in a production pipeline?
Step-by-Step Blueprint to Scale Consumers Without Losing State

- Configure the source for checkpoint-aligned offsets
```java
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"events",
new SimpleStringSchema(),
props);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
```
`setCommitOffsetsOnCheckpoints(true)` tells Flink to commit offsets only when a checkpoint succeeds, tying the two together.
- Enable transactional writes on the sink
```properties
# producer.properties
enable.idempotence=true
transactional.id=flink-job-${job.id}
acks=all
retries=5
```
- Stage the rollout with a savepoint
```bash
# Take a savepoint
bin/flink savepoint :jobId :targetDir
# Stop the old job
bin/flink cancel :jobId
# Deploy the new job, pointing to the savepoint
bin/flink run -s :savepointPath -d my-flink-job.jar
```
This ensures that the new consumers start from a known, consistent state.
- Stagger consumer start-up - Deploy the new job with a lower parallelism than the final target. - Wait until the consumer lag metric stabilizes at a low level. - Increase parallelism to the final target.
Staggering prevents a massive simultaneous rebalance.
- Monitor lag and checkpoint health - JMX metric: `kafka.consumer.fetch.manager.records.lag` - Prometheus alert: `flink_checkpoint_status{status="FAILED"} > 0`
Set alerts to fire before scaling actions.
- Validate exactly-once after rollout - Run a downstream idempotency test (write a known key twice, ensure only one record appears). - Compare state snapshots before and after scaling using RocksDB’s checkpoint files. - Verify that the sink’s committed offsets match the checkpoint metadata.
Following this playbook eliminates the race between rebalance and checkpoint, preserving exactly-once guarantees even as you increase consumer count.
What evidence can you collect to prove the system stayed consistent?
Practical Validation Queries
A typical validation query looks like this:
1SELECT COUNT(*) AS before_cnt2FROM inventory_snapshot3WHERE snapshot_ts = '2026-06-01T00:00:00Z';
Run the query before scaling, repeat it after the rollout, and compare the counts. Identical results confirm that state did not drift.
Another practical check uses Kafka’s consumer group offset tool:
1kafka-consumer-groups.sh \2 --bootstrap-server broker:9092 \3 --describe \4 --group flink-consumer-group
If the `CURRENT-OFFSET` column matches the `LOG-END-OFFSET` for every partition, the job has caught up and no lag remains.
These signals give you confidence that the exactly-once contract holds, even as you add more consumers. The pattern suits regulated environments where audit trails must be immutable.
Ready to embed these safeguards into your stack?
Frequently Asked Questions
Q: Why does scaling Kafka consumers break Flink's exactly-once guarantee?
A: Adding consumers triggers Kafka’s partition rebalance, which can desynchronize Flink’s checkpoint barrier from the actual offsets read, causing state drift and breaking the exactly-once promise.
Q: How can I keep consumer lag under control while scaling?
A: Monitor lag via JMX or Prometheus, set alert thresholds, and use staggered consumer rollouts with savepoints so new instances only start after lag clears.
Q: Do I need a transactional Kafka producer for exactly-once in Flink?
A: Yes - enabling idempotence and transactions ensures that writes are committed atomically with Flink’s checkpoints, preventing duplicates when rebalances occur.
Q: Can I use Flink's heap state backend for this pattern?
A: Both heap and RocksDB backends work, but RocksDB offers better durability for large state; the key is to pair the backend with checkpoint-aligned commits.
Q: What's the safest way to roll out new consumer instances?
A: Take a savepoint, stop the old consumers, start the new ones with the same group ID, and let Flink restore state from the savepoint before processing resumes.
Related reading: - [Exactly-Once Hides Latency in Kafka-Flink Pipelines](/posts/exactly-once-latency-kafka-flink) - [Why More Kafka Partitions Break Flink Exactly-Once](/posts/why-kafka-partitions-break-flink) - [Why Kafka State Store Breaks Flink Fault Tolerance](/posts/kafka-state-store-flink-fault-tolerance)
Looking for hands-on help? Levitation’s data engineering services can guide you through implementation.
Sources
Research and references cited in this article:
- java - kafka -> flink - performance issues - Stack Overflow
- Scale Kafka Consumers - Catherine Shen - Medium
- Five scalability pitfalls to avoid with your Kafka application - IBM
- Flink kafka consumers stopped consuming messages
- Kafka | Apache Flink
- Exactly once processing and Stateful Processors - Stream Processing - Confluent Community
- How to Achieve Exactly-Once Processing in Kafka
- Kafka Exactly-Once: Producers + Transactions | Conduktor
- Ensuring Exactly-Once Semantics in Kafka Streaming Systems
- Ensuring Exactly-Once Semantics in Real-Time Ingestion
- Use Cases | Apache Flink
- Apache Kafka + Apache Flink = Match Made in Heaven - Kai Waehner
