[ https://issues.apache.org/jira/browse/CAMEL-18350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17610568#comment-17610568 ]
Karen Lease edited comment on CAMEL-18350 at 9/28/22 1:32 PM: -------------------------------------------------------------- I started to look into it and it appears that setting the autoCommit to false is forcing Camel to use a no-op commit manager (which is *not* what is written in the documentation) so in fact the offset is not correctly preserved: {code:java} 2022-09-27 23:33:48.308 DEBUG --- [mer[test_topic]] o.a.c.c.kafka.consumer.CommitManagers : Allowing manual commit management 2022-09-27 23:33:48.308 DEBUG --- [mer[test_topic]] o.a.c.c.kafka.consumer.CommitManagers : Using an NO-OP commit manager for manual commit management{code} So I tried explicitly setting a commit manager, like so: {code:java} .advanced() .kafkaManualCommitFactory(new org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory()) {code} After this it doesn't reconsume the messages before the failing one, but it seems only to consume the one after the failure. That's when using maxPollRecords(1). Without that, the test works, becaue the logs show that it is then seeking to offset 3 after the error and not -1. So there is something strange with the offset handling, as you found too. was (Author: klease78): I started to look into it and it appears that setting the autoCommit to false is forcing Camel to use a no-op commit manager (which is *not* what is written in the documentation) so in fact the offset is not correctly preserved: {code:java} 2022-09-27 23:33:48.308 DEBUG --- [mer[test_topic]] o.a.c.c.kafka.consumer.CommitManagers : Allowing manual commit management 2022-09-27 23:33:48.308 DEBUG --- [mer[test_topic]] o.a.c.c.kafka.consumer.CommitManagers : Using an NO-OP commit manager for manual commit management{code} So I tried explicitly setting a commit manager, like so: {code:java} .advanced() .kafkaManualCommitFactory(new org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory()) {code} After this it doesn't reconsume the messages before the failing one, but it seems only to consume the one after the failure. > camel-kafka: enabling "breakOnFirstError" causes camel to reconsume all > records on error > ---------------------------------------------------------------------------------------- > > Key: CAMEL-18350 > URL: https://issues.apache.org/jira/browse/CAMEL-18350 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 3.18.0 > Reporter: Espen Andreassen > Assignee: Karen Lease > Priority: Minor > Fix For: 3.20.0 > > > {*}Reproducing{*}: > * Configure camel kafka consumer with with "breakOnFirstError" = "true" > * Produce a series of records to kafka record consumed by application > * Ensure offset is commited > * Produce more records > * Trigger an error when processing one of the records > *Expected behavior:* > * Application should only reconsume records after the last committed offset > *Actual behavior:* > * Application reconsumes all records on topic > Whether the erroneous behavior is triggered seem to be dependent on the > offset the failing exchange has internally in the series of records in the > poll. > > I've created a project on github with a failing test that reproduces the > issue: > [https://github.com/espeandr/camel-kafka-incorrectly-reconsumes-entire-topic-demo]. > I haven't been able to find a pattern to which "message offset" in the poll > causes the error. In the test I've reproduced the error by forcing a max poll > size as this simplifies the test setup. The issue is also reproducible > without overriding maxPollSize. > I suspect that offset = 0 is committed when breaking out in the erroneous > cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)