This is actually quite nicely explained by Jason Gustafson on this article
-
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

It's technically up to the application on how to determine whether message
is fully received. If you have database txn involved, I would say that
CommitFailedException should revert all changes you have done. Because you
couldn't commit the offset successfully, you haven't "Really" consumed any
message.

Tailoring your code a little bit:

@Override
public void run() {
    try {
        do {
            processRecords(kafkaConsumer.poll(kafkaConfig.
getPollTimeoutMs()));
            kafkaConsumer.commitSync();
        } while (!isConsumerLoopClosed.get());
    } catch (WakeupException wakeupException) {
        //do nothing if wakeupException is from shutdown hook
        if (!isConsumerLoopClosed.get()) {
            handleConsumerLoopException(wakeupException);
        }
    } catch (RuntimeException ex) { // RuntimeException could also happen
for other reasons here
        if (ex instanceof CommitFailedException) {
            // revert db txn etc. to avoid false positives
        } else if (ex instanceof KafkaException) {
            // do something else.
        } else {
           // alternatively, do this
        }
        handleConsumerLoopException(ex);
    } finally {
        kafkaConsumer.close();
    }

}

One thing to remember is that when you are sending data, as of 1.0.0 API
you can have a "Txn-like" finer control to determine when you have
successfully committed a transaction. You can check beginTransaction(),
commitTransaction(), abortTransaction() methods to see how they can be
utilised to have even finer control over your message delivery.

Regards,


On 1 June 2018 at 05:54, pradeep s <sreekumar.prad...@gmail.com> wrote:

> Hi,
> I am running a poll loop for kafka consumer and the app is deployed in
> kubernetes.I am using manual commits.Have couple of questions on exception
> handling in the poll loop
>
> 1) Do i need to handle consumer rebalance scenario(when any of the consumer
> pod dies) by adding a listener or will the commits be taken care after
> rebalance .
>
> 2) Do i need to handle CommitFailedException specifically
>
> Consume loop code below
>
>
> @Override
> public void run() {
>     try {
>         do {
>             processRecords(kafkaConsumer.poll(kafkaConfig.
> getPollTimeoutMs()));
>             kafkaConsumer.commitSync();
>         } while (!isConsumerLoopClosed.get());
>     } catch (WakeupException wakeupException) {
>         //do nothing if wakeupException is from shutdown hook
>         if (!isConsumerLoopClosed.get()) {
>             handleConsumerLoopException(wakeupException);
>         }
>     } catch (RuntimeException ex) {
>         handleConsumerLoopException(ex);
>     } finally {
>         kafkaConsumer.close();
>     }
>
>
> }
>
> Thanks
> Pradeep
>

Reply via email to