[ https://issues.apache.org/jira/browse/CAMEL-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14736711#comment-14736711 ]
Jonathan Anstey commented on CAMEL-8975: ---------------------------------------- [~mcoyote] would you mind submitting a PR for this? Sounds like an awesome improvement! > camel-kafka - Message loss with batch commit > -------------------------------------------- > > Key: CAMEL-8975 > URL: https://issues.apache.org/jira/browse/CAMEL-8975 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 2.15.2 > Environment: Unbuntu LTS 14.x, Java 7 > Reporter: Michael J. Kitchin > > These issues center around Kafka consumer (KafaConsumer.java, line numbers > below): > # Exchange exceptions/failures ignored at process() (:148), meaning: > ## Automatic offset commit on exchange failure (e.g., processor/endpoint > exception) > ## In-flight exchange loss on Camel context/runtime shutdown (i.e., route > interrupted -> exception suppressed -> offset committed) > # BatchCommitConsumerTask activations are unbalanced during periods of low > activity, meaning: > ## await() (:165) will timeout for active BatchCommitConsumerTask(s) when > other consumer threads are binding on it.hasNext() (:145) (blocking call, > despite no @throws) > ## Any, previously-activated await()'ing thread will (a) get a > TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next > await() call and (d) exit > ## Process will repeat until (a) all consumer stream threads have exited, (b) > leaving consumer dead > ## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the > route) > # An ExecutorService is obtained from Camel to handle KafkaStreams with # of > threads set to the consumerStreams param (:77). Since the # of KafkaStreams > actually created is (consumersCount * consumerStreams) and executor runnables > are indefinite loops, a random selection of streams will not be serviced if > consumersCount>1. > Source code URL: > - > https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java > We've troubleshot this extensively and reimplemented the KafkaConsumer class > with params added to KafkaConfiguration to address these concerns and are > happy to submit these back to the community, if interested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)