This is actually quite nicely explained by Jason Gustafson on this article - https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
It's technically up to the application on how to determine whether message is fully received. If you have database txn involved, I would say that CommitFailedException should revert all changes you have done. Because you couldn't commit the offset successfully, you haven't "Really" consumed any message. Tailoring your code a little bit: @Override public void run() { try { do { processRecords(kafkaConsumer.poll(kafkaConfig. getPollTimeoutMs())); kafkaConsumer.commitSync(); } while (!isConsumerLoopClosed.get()); } catch (WakeupException wakeupException) { //do nothing if wakeupException is from shutdown hook if (!isConsumerLoopClosed.get()) { handleConsumerLoopException(wakeupException); } } catch (RuntimeException ex) { // RuntimeException could also happen for other reasons here if (ex instanceof CommitFailedException) { // revert db txn etc. to avoid false positives } else if (ex instanceof KafkaException) { // do something else. } else { // alternatively, do this } handleConsumerLoopException(ex); } finally { kafkaConsumer.close(); } } One thing to remember is that when you are sending data, as of 1.0.0 API you can have a "Txn-like" finer control to determine when you have successfully committed a transaction. You can check beginTransaction(), commitTransaction(), abortTransaction() methods to see how they can be utilised to have even finer control over your message delivery. Regards, On 1 June 2018 at 05:54, pradeep s <sreekumar.prad...@gmail.com> wrote: > Hi, > I am running a poll loop for kafka consumer and the app is deployed in > kubernetes.I am using manual commits.Have couple of questions on exception > handling in the poll loop > > 1) Do i need to handle consumer rebalance scenario(when any of the consumer > pod dies) by adding a listener or will the commits be taken care after > rebalance . > > 2) Do i need to handle CommitFailedException specifically > > Consume loop code below > > > @Override > public void run() { > try { > do { > processRecords(kafkaConsumer.poll(kafkaConfig. > getPollTimeoutMs())); > kafkaConsumer.commitSync(); > } while (!isConsumerLoopClosed.get()); > } catch (WakeupException wakeupException) { > //do nothing if wakeupException is from shutdown hook > if (!isConsumerLoopClosed.get()) { > handleConsumerLoopException(wakeupException); > } > } catch (RuntimeException ex) { > handleConsumerLoopException(ex); > } finally { > kafkaConsumer.close(); > } > > > } > > Thanks > Pradeep >