[ 
https://issues.apache.org/jira/browse/CAMEL-18588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-18588:
--------------------------------
    Fix Version/s: 3.20.0

> Kafka consumer on any exception should repoll records after the committed 
> offset
> --------------------------------------------------------------------------------
>
>                 Key: CAMEL-18588
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18588
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.18.0
>            Reporter: rupam
>            Priority: Minor
>             Fix For: 3.20.0
>
>
> In Camel-Kafka , if max poll records is set to more than 1 and 
> breakOnFirstError is set to TRUE, then on any exception thrown on the first 
> message in the batch will cause the KafkaConsumer to start repolling from 
> zero offset because partitionLastOffset is -1 for the first message
> {code:java}
>   private boolean processException(
>             Exchange exchange, TopicPartition partition, long 
> partitionLastOffset,
>             ExceptionHandler exceptionHandler) {
>         // processing failed due to an unhandled exception, what should we do
>         if (configuration.isBreakOnFirstError()) {
>             // we are failing and we should break out
>             if (LOG.isWarnEnabled()) {
>                 LOG.warn("Error during processing {} from topic: {}", 
> exchange, partition.topic(), exchange.getException());
>                 LOG.warn("Will seek consumer to offset {} and start polling 
> again.", partitionLastOffset);
>             }
>             // force commit, so we resume on next poll where we failed
>             commitManager.forceCommit(partition, partitionLastOffset);
>             // continue to next partition
>             return true;
>         } else {
>             // will handle/log the exception and then continue to next
>             exceptionHandler.handleException("Error during processing", 
> exchange, exchange.getException());
>         }
>         return false;
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to