This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 5836d588e8f0 CAMEL-20227: Fix Pausable EIP losing messages due to
Kafka offset advancement (#23805)
5836d588e8f0 is described below
commit 5836d588e8f068874757edbdfb0fa808f9f36357
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Jun 6 09:11:58 2026 +0200
CAMEL-20227: Fix Pausable EIP losing messages due to Kafka offset
advancement (#23805)
* CAMEL-20227: camel-kafka - Fix Pausable EIP losing messages due to offset
advancement
The Pausable EIP with Kafka consumer was losing messages because:
1. afterConsume() only evaluated the resume predicate when already paused,
but never called consumer.pause() or seeked back to committed offsets
2. The poll loop continued immediately without pausing, causing auto-commit
to advance offsets for unprocessed records
Fix: Rewrite afterConsume() to always evaluate the predicate, call
consumer.pause() and seek to the correct offset when pausing, and
consumer.resume() when resuming. This ensures poll() returns empty
records during pause and offsets are not advanced for unprocessed messages.
Co-Authored-By: Claude <[email protected]>
Signed-off-by: Claus Ibsen <[email protected]>
* CAMEL-20227: camel-kafka - Document autoCommitEnable risk with Pausable
consumers
Add a note recommending autoCommitEnable=false with allowManualCommit=true
when using pausable consumers, to avoid message loss from Kafka
auto-committing
offsets for unprocessed records during pause/resume cycles.
Co-Authored-By: Claude <[email protected]>
Signed-off-by: Claus Ibsen <[email protected]>
---------
Signed-off-by: Claus Ibsen <[email protected]>
Co-authored-by: Claude <[email protected]>
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 4 ++
.../errorhandler/KafkaConsumerListener.java | 47 ++++++++++------------
2 files changed, 25 insertions(+), 26 deletions(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 5bfe3fce1016..850b5abae963 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -295,6 +295,10 @@ In this example, consuming messages can pause (by calling
the Kafka's Consumer p
IMPORTANT: The pausable EIP is meant to be used as a support mechanism when
*there is an exception* somewhere in the route that prevents the exchange from
being processed. More specifically,
the check called by the `pausable` EIP should be used to test for transient
conditions preventing the exchange from being processed.
+NOTE: When using pausable consumers, it is recommended to set
`autoCommitEnable=false` and `allowManualCommit=true`
+so that offsets are only committed after successful processing. With
`autoCommitEnable=true`, Kafka may auto-commit
+offsets for records that were polled but not yet processed, which can lead to
message loss during pause/resume cycles.
+
NOTE: most users should prefer using the
xref:manual::route-policy.adoc[RoutePolicy], which offers better control of the
route.
=== Kafka Headers propagation
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
index 9d3a2fd9ccb8..9c79052121db 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
@@ -32,7 +32,6 @@ public class KafkaConsumerListener implements
ConsumerListener<Object, Processin
private SeekPolicy seekPolicy;
private Predicate<?> afterConsumeEval;
- private boolean paused;
public Consumer<?, ?> getConsumer() {
return consumer;
@@ -57,40 +56,36 @@ public class KafkaConsumerListener implements
ConsumerListener<Object, Processin
@Override
public boolean afterConsume(@SuppressWarnings("unused") Object ignored) {
- if (paused) {
- if (afterConsumeEval.test(null)) {
- LOG.warn("State changed, therefore resuming the consumer");
- consumer.resume(consumer.assignment());
-
- return true;
- }
-
- LOG.warn("The consumer is not yet resumable");
- return false;
+ boolean resume = afterConsumeEval.test(null);
+ if (resume) {
+ LOG.debug("Resuming consumer");
+ consumer.resume(consumer.assignment());
+ } else {
+ LOG.debug("Pausing consumer");
+ consumer.pause(consumer.assignment());
+ seekConsumer();
}
-
- // It's not paused, so we can continue processing
- return true;
+ return resume;
}
@Override
public boolean afterProcess(ProcessingResult result) {
if (result.isFailed()) {
- LOG.warn("Pausing consumer due to error on the last processing");
+ LOG.debug("Pausing consumer due to last processing error");
consumer.pause(consumer.assignment());
- paused = true;
-
- if (seekPolicy == SeekPolicy.BEGINNING) {
- LOG.debug("Seeking from the beginning of topic");
- consumer.seekToBeginning(consumer.assignment());
- } else if (seekPolicy == SeekPolicy.END) {
- LOG.debug("Seeking from the end off the topic");
- consumer.seekToEnd(consumer.assignment());
- }
-
+ seekConsumer();
return false;
}
-
return true;
}
+
+ protected void seekConsumer() {
+ if (seekPolicy == SeekPolicy.BEGINNING) {
+ LOG.debug("Seeking to beginning of topic");
+ consumer.seekToBeginning(consumer.assignment());
+ } else if (seekPolicy == SeekPolicy.END) {
+ LOG.debug("Seeking to end of topic");
+ consumer.seekToEnd(consumer.assignment());
+ }
+ }
}