[ 
https://issues.apache.org/jira/browse/CAMEL-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14736711#comment-14736711
 ] 

Jonathan Anstey commented on CAMEL-8975:
----------------------------------------

[~mcoyote] would you mind submitting a PR for this? Sounds like an awesome 
improvement!

> camel-kafka - Message loss with batch commit
> --------------------------------------------
>
>                 Key: CAMEL-8975
>                 URL: https://issues.apache.org/jira/browse/CAMEL-8975
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.15.2
>         Environment: Unbuntu LTS 14.x, Java 7
>            Reporter: Michael J. Kitchin
>
> These issues center around Kafka consumer (KafaConsumer.java, line numbers 
> below):
> # Exchange exceptions/failures ignored at process() (:148), meaning:
> ## Automatic offset commit on exchange failure (e.g., processor/endpoint 
> exception)
> ## In-flight exchange loss on Camel context/runtime shutdown (i.e., route 
> interrupted -> exception suppressed -> offset committed)
> # BatchCommitConsumerTask activations are unbalanced during periods of low 
> activity, meaning:
> ## await() (:165) will timeout for active BatchCommitConsumerTask(s) when 
> other consumer threads are binding on it.hasNext() (:145) (blocking call, 
> despite no @throws)
> ## Any, previously-activated await()'ing thread will (a) get a 
> TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next 
> await() call and (d) exit
> ## Process will repeat until (a) all consumer stream threads have exited, (b) 
> leaving consumer dead
> ## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the 
> route)
> # An ExecutorService is obtained from Camel to handle KafkaStreams with # of 
> threads set to the consumerStreams param (:77). Since the # of KafkaStreams 
> actually created is (consumersCount * consumerStreams) and executor runnables 
> are indefinite loops, a random selection of streams will not be serviced if 
> consumersCount>1.
> Source code URL:
> - 
> https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
> We've troubleshot this extensively and reimplemented the KafkaConsumer class 
> with params added to KafkaConfiguration to address these concerns and are 
> happy to submit these back to the community, if interested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to