Hi Michael,

I think we don’t take the consideration of committing the offset during the 
shutdown processing.
Please feel free to create a JIRA[1] and send out a patch of it.

[1]https://issues.apache.org/jira/browse/CAMEL

--  
Willem Jiang


Blog: http://willemjiang.blogspot.com (English)  
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On July 5, 2015 at 6:42:31 AM, Michael J. Kitchin (mcoyote...@gmail.com) wrote:
> Hi there,
>  
> These are questions re: the official camel-kafka integration. Since the
> issues touch on both camel and kafka I'm sending to both users lists in
> search of answers.
>  
> - - - - -
>  
> I have a simple, inonly, point-to-point, synchronous camel route (a)
> consuming from kafka topic (consumer), (b) running the resulting exchanges
> (messages) through a processor chain and (c) forwarding the outcome on to
> somewhere else.
>  
> If the runtime dies while exchanges are in this pipeline, I want things to
> pick up where they left off when restarted. I'm not picky about seeing the
> same data more than once (as long as it's recent), I just can't lose
> anything.
>  
> In brief, everything's working great except this failure/recovery part --
> in-flight exchanges are getting lost and there is no, apparent re-delivery
> on restart.
>  
> I think this has to do with consumer auto-commit, which is the default
> behavior. My reading of the kafka and camel-kafka docs suggests disabling
> auto-commit will give me what I want, but when I try it I'm not seeing
> re-delivery kick off when I restart.
>  
> After disabling auto-commit, II walked through this in the debugger, FWIW.
> If interested, you can see the code, here:
> -
> https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
>   
>  
> ...Specifically:
> - Processing is underway line 148: processor.process(exchange);
> - Camel triggers a shutdown and this returns, as normal without any
> exchange exception check
> - The barrier await hits at line
> 165: berrier.await(endpoint.getBarrierAwaitTimeoutMs(),
> TimeUnit.MILLISECONDS);
> - This aligns with the rest of the streams and triggers the barrier action,
> which in turn performs the commit at line 193: consumer.commitOffsets();
>  
> Since any exception from line 148 is suppressed and there's no subsequent
> interrupted() or exchange exception check it looks like there's no way to
> to signal not to commit and in-flight exchanges are a guaranteed loss.
>  
> Does this sound correct?
>  
> If so, barring a change from the maintainers I figure I might fork this
> code and optionally bypass the consumer.commitOffsets(); during shutdown
> or when an exchange carries an exception after the
> processor.process(exchange) call.
>  
> Thoughts?
>  
> Please let me know if I may provide any additional information. Thanks.
>  
> -Regards,
> MjK
>  
> - - - - -
>  
> Michael J. Kitchin
> Senior Software Engineer
> Operational Systems, Inc.
> 4450 Arapahoe Avenue, Suite 100
> Boulder, CO 80303
>  
> Phone: 719-271-6476
> Email: michael.kitc...@opsysinc.com
> Web: www.opsysinc.com
>  

Reply via email to