Hi Michael, I think we don’t take the consideration of committing the offset during the shutdown processing. Please feel free to create a JIRA[1] and send out a patch of it.
[1]https://issues.apache.org/jira/browse/CAMEL -- Willem Jiang Blog: http://willemjiang.blogspot.com (English) http://jnn.iteye.com (Chinese) Twitter: willemjiang Weibo: 姜宁willem On July 5, 2015 at 6:42:31 AM, Michael J. Kitchin (mcoyote...@gmail.com) wrote: > Hi there, > > These are questions re: the official camel-kafka integration. Since the > issues touch on both camel and kafka I'm sending to both users lists in > search of answers. > > - - - - - > > I have a simple, inonly, point-to-point, synchronous camel route (a) > consuming from kafka topic (consumer), (b) running the resulting exchanges > (messages) through a processor chain and (c) forwarding the outcome on to > somewhere else. > > If the runtime dies while exchanges are in this pipeline, I want things to > pick up where they left off when restarted. I'm not picky about seeing the > same data more than once (as long as it's recent), I just can't lose > anything. > > In brief, everything's working great except this failure/recovery part -- > in-flight exchanges are getting lost and there is no, apparent re-delivery > on restart. > > I think this has to do with consumer auto-commit, which is the default > behavior. My reading of the kafka and camel-kafka docs suggests disabling > auto-commit will give me what I want, but when I try it I'm not seeing > re-delivery kick off when I restart. > > After disabling auto-commit, II walked through this in the debugger, FWIW. > If interested, you can see the code, here: > - > https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java > > > ...Specifically: > - Processing is underway line 148: processor.process(exchange); > - Camel triggers a shutdown and this returns, as normal without any > exchange exception check > - The barrier await hits at line > 165: berrier.await(endpoint.getBarrierAwaitTimeoutMs(), > TimeUnit.MILLISECONDS); > - This aligns with the rest of the streams and triggers the barrier action, > which in turn performs the commit at line 193: consumer.commitOffsets(); > > Since any exception from line 148 is suppressed and there's no subsequent > interrupted() or exchange exception check it looks like there's no way to > to signal not to commit and in-flight exchanges are a guaranteed loss. > > Does this sound correct? > > If so, barring a change from the maintainers I figure I might fork this > code and optionally bypass the consumer.commitOffsets(); during shutdown > or when an exchange carries an exception after the > processor.process(exchange) call. > > Thoughts? > > Please let me know if I may provide any additional information. Thanks. > > -Regards, > MjK > > - - - - - > > Michael J. Kitchin > Senior Software Engineer > Operational Systems, Inc. > 4450 Arapahoe Avenue, Suite 100 > Boulder, CO 80303 > > Phone: 719-271-6476 > Email: michael.kitc...@opsysinc.com > Web: www.opsysinc.com >