This is actually quite nicely explained by Jason Gustafson on this article

It's technically up to the application on how to determine whether message
is fully received. If you have database txn involved, I would say that
CommitFailedException should revert all changes you have done. Because you
couldn't commit the offset successfully, you haven't "Really" consumed any

Tailoring your code a little bit:

public void run() {
    try {
        do {
        } while (!isConsumerLoopClosed.get());
    } catch (WakeupException wakeupException) {
        //do nothing if wakeupException is from shutdown hook
        if (!isConsumerLoopClosed.get()) {
    } catch (RuntimeException ex) { // RuntimeException could also happen
for other reasons here
        if (ex instanceof CommitFailedException) {
            // revert db txn etc. to avoid false positives
        } else if (ex instanceof KafkaException) {
            // do something else.
        } else {
           // alternatively, do this
    } finally {


One thing to remember is that when you are sending data, as of 1.0.0 API
you can have a "Txn-like" finer control to determine when you have
successfully committed a transaction. You can check beginTransaction(),
commitTransaction(), abortTransaction() methods to see how they can be
utilised to have even finer control over your message delivery.


On 1 June 2018 at 05:54, pradeep s <> wrote:

> Hi,
> I am running a poll loop for kafka consumer and the app is deployed in
> kubernetes.I am using manual commits.Have couple of questions on exception
> handling in the poll loop
> 1) Do i need to handle consumer rebalance scenario(when any of the consumer
> pod dies) by adding a listener or will the commits be taken care after
> rebalance .
> 2) Do i need to handle CommitFailedException specifically
> Consume loop code below
> @Override
> public void run() {
>     try {
>         do {
>             processRecords(kafkaConsumer.poll(kafkaConfig.
> getPollTimeoutMs()));
>             kafkaConsumer.commitSync();
>         } while (!isConsumerLoopClosed.get());
>     } catch (WakeupException wakeupException) {
>         //do nothing if wakeupException is from shutdown hook
>         if (!isConsumerLoopClosed.get()) {
>             handleConsumerLoopException(wakeupException);
>         }
>     } catch (RuntimeException ex) {
>         handleConsumerLoopException(ex);
>     } finally {
>         kafkaConsumer.close();
>     }
> }
> Thanks
> Pradeep

Reply via email to