This is actually quite nicely explained by Jason Gustafson on this article
-
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
It's technically up to the application on how to determine whether message
is fully received. If you have database txn involved, I would say that
CommitFailedException should revert all changes you have done. Because you
couldn't commit the offset successfully, you haven't "Really" consumed any
message.
Tailoring your code a little bit:
@Override
public void run() {
try {
do {
processRecords(kafkaConsumer.poll(kafkaConfig.
getPollTimeoutMs()));
kafkaConsumer.commitSync();
} while (!isConsumerLoopClosed.get());
} catch (WakeupException wakeupException) {
//do nothing if wakeupException is from shutdown hook
if (!isConsumerLoopClosed.get()) {
handleConsumerLoopException(wakeupException);
}
} catch (RuntimeException ex) { // RuntimeException could also happen
for other reasons here
if (ex instanceof CommitFailedException) {
// revert db txn etc. to avoid false positives
} else if (ex instanceof KafkaException) {
// do something else.
} else {
// alternatively, do this
}
handleConsumerLoopException(ex);
} finally {
kafkaConsumer.close();
}
}
One thing to remember is that when you are sending data, as of 1.0.0 API
you can have a "Txn-like" finer control to determine when you have
successfully committed a transaction. You can check beginTransaction(),
commitTransaction(), abortTransaction() methods to see how they can be
utilised to have even finer control over your message delivery.
Regards,
On 1 June 2018 at 05:54, pradeep s <[email protected]> wrote:
> Hi,
> I am running a poll loop for kafka consumer and the app is deployed in
> kubernetes.I am using manual commits.Have couple of questions on exception
> handling in the poll loop
>
> 1) Do i need to handle consumer rebalance scenario(when any of the consumer
> pod dies) by adding a listener or will the commits be taken care after
> rebalance .
>
> 2) Do i need to handle CommitFailedException specifically
>
> Consume loop code below
>
>
> @Override
> public void run() {
> try {
> do {
> processRecords(kafkaConsumer.poll(kafkaConfig.
> getPollTimeoutMs()));
> kafkaConsumer.commitSync();
> } while (!isConsumerLoopClosed.get());
> } catch (WakeupException wakeupException) {
> //do nothing if wakeupException is from shutdown hook
> if (!isConsumerLoopClosed.get()) {
> handleConsumerLoopException(wakeupException);
> }
> } catch (RuntimeException ex) {
> handleConsumerLoopException(ex);
> } finally {
> kafkaConsumer.close();
> }
>
>
> }
>
> Thanks
> Pradeep
>