This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch fix/CAMEL-20227 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 408dab75ccb6d03d91949d63525a2feb6102ed62 Author: Claus Ibsen <[email protected]> AuthorDate: Sat Jun 6 07:42:36 2026 +0200 CAMEL-20227: camel-kafka - Fix Pausable EIP losing messages due to offset advancement The Pausable EIP with Kafka consumer was losing messages because: 1. afterConsume() only evaluated the resume predicate when already paused, but never called consumer.pause() or seeked back to committed offsets 2. The poll loop continued immediately without pausing, causing auto-commit to advance offsets for unprocessed records Fix: Rewrite afterConsume() to always evaluate the predicate, call consumer.pause() and seek to the correct offset when pausing, and consumer.resume() when resuming. This ensures poll() returns empty records during pause and offsets are not advanced for unprocessed messages. Co-Authored-By: Claude <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../errorhandler/KafkaConsumerListener.java | 47 ++++++++++------------ 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java index 9d3a2fd9ccb8..9c79052121db 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java @@ -32,7 +32,6 @@ public class KafkaConsumerListener implements ConsumerListener<Object, Processin private SeekPolicy seekPolicy; private Predicate<?> afterConsumeEval; - private boolean paused; public Consumer<?, ?> getConsumer() { return consumer; @@ -57,40 +56,36 @@ public class KafkaConsumerListener implements ConsumerListener<Object, Processin @Override public boolean afterConsume(@SuppressWarnings("unused") Object ignored) { - if (paused) { - if (afterConsumeEval.test(null)) { - LOG.warn("State changed, therefore resuming the consumer"); - consumer.resume(consumer.assignment()); - - return true; - } - - LOG.warn("The consumer is not yet resumable"); - return false; + boolean resume = afterConsumeEval.test(null); + if (resume) { + LOG.debug("Resuming consumer"); + consumer.resume(consumer.assignment()); + } else { + LOG.debug("Pausing consumer"); + consumer.pause(consumer.assignment()); + seekConsumer(); } - - // It's not paused, so we can continue processing - return true; + return resume; } @Override public boolean afterProcess(ProcessingResult result) { if (result.isFailed()) { - LOG.warn("Pausing consumer due to error on the last processing"); + LOG.debug("Pausing consumer due to last processing error"); consumer.pause(consumer.assignment()); - paused = true; - - if (seekPolicy == SeekPolicy.BEGINNING) { - LOG.debug("Seeking from the beginning of topic"); - consumer.seekToBeginning(consumer.assignment()); - } else if (seekPolicy == SeekPolicy.END) { - LOG.debug("Seeking from the end off the topic"); - consumer.seekToEnd(consumer.assignment()); - } - + seekConsumer(); return false; } - return true; } + + protected void seekConsumer() { + if (seekPolicy == SeekPolicy.BEGINNING) { + LOG.debug("Seeking to beginning of topic"); + consumer.seekToBeginning(consumer.assignment()); + } else if (seekPolicy == SeekPolicy.END) { + LOG.debug("Seeking to end of topic"); + consumer.seekToEnd(consumer.assignment()); + } + } }
