[ https://issues.apache.org/jira/browse/CAMEL-13339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16797790#comment-16797790 ]
Ramu commented on CAMEL-13339: ------------------------------ you are very welcome to send a PR. We love contributions https://github.com/apache/camel/blob/master/CONTRIBUTING.md > StateRepository implemented to save offset state outside Kafka, message loss > occurs upon partition revoke > --------------------------------------------------------------------------------------------------------- > > Key: CAMEL-13339 > URL: https://issues.apache.org/jira/browse/CAMEL-13339 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 2.23.0 > Reporter: Viswa Ramamoorthy > Priority: Major > > Current implementation of > org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords's > onPartitionsRevoked, uses > org.apache.kafka.clients.consumer.KafkaConsumer.position(partition). This > approach causes message loss when multiple processes listening to same topic > for point to point messaging (like a QUEUE) type implementation. > > Issue is noticed when partition gets assigned and then gets revoked in quick > succession. Upon partition assignment, say at the beginning of processing > offset is set to 0, and say there was no poll for this partition (may be due > to earlier poll brought in bunch of records and they are still being > processed). Subsequently, say partition got revoked, before polling. > In this case, as onPartitionsRevoked looks at > org.apache.kafka.clients.consumer.KafkaConsumer.position(partition) to save > offset state and so 0 gets saved in this case in StateRepository > implementation. When the same partition get assigned again, > StateRepository.getState returns 0. Since Camel KafkaConsumer treats this as > last processed offset, it adds 1 to it moving pointer to offset 1. Because of > this, message at offset 0 never gets processed. > > Two fixes might be needed > # a) onPartitionsRevoked should look at last processed offset (possibly > store 'last processed offset' for each topic/partition in a memory map) and > use it to save offset > # b) Currently onPartitionsRevoked just saves offset state when an > implementation of StateRepository configured. Ideally it should call > KafkaFetchRecords.commitOffset so commitSync call goes through when partition > revoked and no StateRepository implementation configured -- This message was sent by Atlassian JIRA (v7.6.3#76005)