Hi there, These are questions re: the official camel-kafka integration. Since the issues touch on both camel and kafka I'm sending to both users lists in search of answers.
- - - - - I have a simple, inonly, point-to-point, synchronous camel route (a) consuming from kafka topic (consumer), (b) running the resulting exchanges (messages) through a processor chain and (c) forwarding the outcome on to somewhere else. If the runtime dies while exchanges are in this pipeline, I want things to pick up where they left off when restarted. I'm not picky about seeing the same data more than once (as long as it's recent), I just can't lose anything. In brief, everything's working great except this failure/recovery part -- in-flight exchanges are getting lost and there is no, apparent re-delivery on restart. I think this has to do with consumer auto-commit, which is the default behavior. My reading of the kafka and camel-kafka docs suggests disabling auto-commit will give me what I want, but when I try it I'm not seeing re-delivery kick off when I restart. After disabling auto-commit, II walked through this in the debugger, FWIW. If interested, you can see the code, here: - https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ...Specifically: - Processing is underway line 148: processor.process(exchange); - Camel triggers a shutdown and this returns, as normal without any exchange exception check - The barrier await hits at line 165: berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS); - This aligns with the rest of the streams and triggers the barrier action, which in turn performs the commit at line 193: consumer.commitOffsets(); Since any exception from line 148 is suppressed and there's no subsequent interrupted() or exchange exception check it looks like there's no way to to signal not to commit and in-flight exchanges are a guaranteed loss. Does this sound correct? If so, barring a change from the maintainers I figure I might fork this code and optionally bypass the consumer.commitOffsets(); during shutdown or when an exchange carries an exception after the processor.process(exchange) call. Thoughts? Please let me know if I may provide any additional information. Thanks. -Regards, MjK - - - - - Michael J. Kitchin Senior Software Engineer Operational Systems, Inc. 4450 Arapahoe Avenue, Suite 100 Boulder, CO 80303 Phone: 719-271-6476 Email: michael.kitc...@opsysinc.com Web: www.opsysinc.com