CAMEL-11215: Add breakOnError to camel-kafka so the consumer stops on first 
exception and allow the same message to be polled again for retry.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9945e25d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9945e25d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9945e25d

Branch: refs/heads/CAMEL-11215
Commit: 9945e25d1ddc92954c2fc08cfc07f87df918ce39
Parents: c17ca1f
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon May 15 15:01:51 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 15 15:06:11 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  2 +-
 .../camel/component/kafka/KafkaConsumer.java    | 50 ++++++++++----------
 .../springboot/KafkaComponentConfiguration.java |  2 +-
 3 files changed, 26 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9945e25d/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index bdbebce..5181e33 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -77,7 +77,7 @@ with the following path and query parameters:
 | **autoCommitIntervalMs** (consumer) | The frequency in ms that the consumer 
offsets are committed to zookeeper. | 5000 | Integer
 | **autoCommitOnStop** (consumer) | Whether to perform an explicit auto commit 
when the consumer stops to ensure the broker has a commit from the last 
consumed message. This requires the option autoCommitEnable is turned on. The 
possible values are: sync async or none. And sync is the default value. | sync 
| String
 | **autoOffsetReset** (consumer) | What to do when there is no initial offset 
in ZooKeeper or if an offset is out of range: smallest : automatically reset 
the offset to the smallest offset largest : automatically reset the offset to 
the largest offset fail: throw exception to the consumer | latest | String
-| **breakOnFirstError** (consumer) | This options controls what happens when a 
consumer is processing an exchange and it fails. If the option is false then 
the consumer continues to the next message and processes it. If the option is 
true then the consumer breaks out and will seek back to offset of the message 
that caused a failure and then re-attempt to process this message. However this 
can lead to endless processing of the same message if its bound to fail every 
time eg a poison message. Therefore its recommended to deal with that for 
example by using Camel's error handler. | true | boolean
+| **breakOnFirstError** (consumer) | This options controls what happens when a 
consumer is processing an exchange and it fails. If the option is false then 
the consumer continues to the next message and processes it. If the option is 
true then the consumer breaks out and will seek back to offset of the message 
that caused a failure and then re-attempt to process this message. However this 
can lead to endless processing of the same message if its bound to fail every 
time eg a poison message. Therefore its recommended to deal with that for 
example by using Camel's error handler. | false | boolean
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
 | **checkCrcs** (consumer) | Automatically check the CRC32 of the records 
consumed. This ensures no on-the-wire or on-disk corruption to the messages 
occurred. This check adds some overhead so it may be disabled in cases seeking 
extreme performance. | true | Boolean
 | **consumerRequestTimeoutMs** (consumer) | The configuration controls the 
maximum amount of time the client will wait for the response of a request. If 
the response is not received before the timeout elapses the client will resend 
the request if necessary or fail the request if retries are exhausted. | 40000 
| Integer

http://git-wip-us.apache.org/repos/asf/camel/blob/9945e25d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 19d90ac..ad172cb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -37,13 +37,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class KafkaConsumer extends DefaultConsumer {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumer.class);
-
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private final Processor processor;
@@ -88,7 +84,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws Exception {
-        LOG.info("Starting Kafka consumer");
+        log.info("Starting Kafka consumer");
         super.doStart();
 
         executor = endpoint.createExecutor();
@@ -101,7 +97,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     @Override
     protected void doStop() throws Exception {
-        LOG.info("Stopping Kafka consumer");
+        log.info("Stopping Kafka consumer");
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
@@ -134,7 +130,6 @@ public class KafkaConsumer extends DefaultConsumer {
         }
 
         @Override
-        @SuppressWarnings("unchecked")
         public void run() {
             boolean first = true;
             boolean reConnect = true;
@@ -169,12 +164,13 @@ public class KafkaConsumer extends DefaultConsumer {
             }
         }
 
+        @SuppressWarnings("unchecked")
         protected boolean doRun() {
             // allow to re-connect thread in case we use that to retry failed 
messages
             boolean reConnect = false;
 
             try {
-                LOG.info("Subscribing {} to topic {}", threadId, topicName);
+                log.info("Subscribing {} to topic {}", threadId, topicName);
                 consumer.subscribe(Arrays.asList(topicName.split(",")));
 
                 StateRepository<String, String> offsetRepository = 
endpoint.getConfiguration().getOffsetRepository();
@@ -187,7 +183,7 @@ public class KafkaConsumer extends DefaultConsumer {
                         if (offsetState != null && !offsetState.isEmpty()) {
                             // The state contains the last read offset so you 
need to seek from the next one
                             long offset = deserializeOffsetValue(offsetState) 
+ 1;
-                            LOG.debug("Resuming partition {} from offset {} 
from state", topicPartition.partition(), offset);
+                            log.debug("Resuming partition {} from offset {} 
from state", topicPartition.partition(), offset);
                             consumer.seek(topicPartition, offset);
                         } else {
                             // If the init poll has returned some data of a 
currently unknown topic/partition in the state
@@ -195,19 +191,19 @@ public class KafkaConsumer extends DefaultConsumer {
                             List<ConsumerRecord<Object, Object>> 
partitionRecords = poll.records(topicPartition);
                             if (!partitionRecords.isEmpty()) {
                                 long offset = partitionRecords.get(0).offset();
-                                LOG.debug("Resuming partition {} from offset 
{}", topicPartition.partition(), offset);
+                                log.debug("Resuming partition {} from offset 
{}", topicPartition.partition(), offset);
                                 consumer.seek(topicPartition, offset);
                             }
                         }
                     }
                 } else if (endpoint.getConfiguration().getSeekTo() != null) {
                     if 
(endpoint.getConfiguration().getSeekTo().equals("beginning")) {
-                        LOG.debug("{} is seeking to the beginning on topic 
{}", threadId, topicName);
+                        log.debug("{} is seeking to the beginning on topic 
{}", threadId, topicName);
                         // This poll to ensures we have an assigned partition 
otherwise seek won't work
                         consumer.poll(100);
                         consumer.seekToBeginning(consumer.assignment());
                     } else if 
(endpoint.getConfiguration().getSeekTo().equals("end")) {
-                        LOG.debug("{} is seeking to the end on topic {}", 
threadId, topicName);
+                        log.debug("{} is seeking to the end on topic {}", 
threadId, topicName);
                         // This poll to ensures we have an assigned partition 
otherwise seek won't work
                         consumer.poll(100);
                         consumer.seekToEnd(consumer.assignment());
@@ -231,8 +227,8 @@ public class KafkaConsumer extends DefaultConsumer {
 
                             while (!breakOnErrorHit && 
recordIterator.hasNext()) {
                                 record = recordIterator.next();
-                                if (LOG.isTraceEnabled()) {
-                                    LOG.trace("Partition = {}, offset = {}, 
key = {}, value = {}", record.partition(), record.offset(), record.key(),
+                                if (log.isTraceEnabled()) {
+                                    log.trace("Partition = {}, offset = {}, 
key = {}, value = {}", record.partition(), record.offset(), record.key(),
                                               record.value());
                                 }
                                 Exchange exchange = 
endpoint.createKafkaExchange(record);
@@ -249,10 +245,10 @@ public class KafkaConsumer extends DefaultConsumer {
                                 if (exchange.getException() != null) {
                                     // processing failed due to an unhandled 
exception, what should we do
                                     if 
(endpoint.getConfiguration().isBreakOnFirstError()) {
-                                        // commit last good offset before we 
try again
-                                        commitOffset(offsetRepository, 
partition, partitionLastOffset);
-                                        // we are failing but store last good 
offset
+                                        // we are failing and we should break 
out
                                         log.warn("Error during processing {} 
from topic: {}. Will seek consumer to offset: {} and re-connect and start 
polling again.", exchange, topicName, partitionLastOffset);
+                                        // force commit so we resume on next 
poll where we failed
+                                        commitOffset(offsetRepository, 
partition, partitionLastOffset, true);
                                         // continue to next partition
                                         breakOnErrorHit = true;
                                     } else {
@@ -267,7 +263,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
                             if (!breakOnErrorHit) {
                                 // all records processed from partition so 
commit them
-                                commitOffset(offsetRepository, partition, 
partitionLastOffset);
+                                commitOffset(offsetRepository, partition, 
partitionLastOffset, false);
                             }
                         }
                     }
@@ -281,39 +277,41 @@ public class KafkaConsumer extends DefaultConsumer {
                 if (!reConnect) {
                     if (endpoint.getConfiguration().isAutoCommitEnable() != 
null && endpoint.getConfiguration().isAutoCommitEnable()) {
                         if 
("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
-                            LOG.info("Auto commitAsync on stop {} from topic 
{}", threadId, topicName);
+                            log.info("Auto commitAsync on stop {} from topic 
{}", threadId, topicName);
                             consumer.commitAsync();
                         } else if 
("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
-                            LOG.info("Auto commitSync on stop {} from topic 
{}", threadId, topicName);
+                            log.info("Auto commitSync on stop {} from topic 
{}", threadId, topicName);
                             consumer.commitSync();
                         }
                     }
                 }
 
-                LOG.info("Unsubscribing {} from topic {}", threadId, 
topicName);
+                log.info("Unsubscribing {} from topic {}", threadId, 
topicName);
                 consumer.unsubscribe();
             } catch (InterruptException e) {
                 getExceptionHandler().handleException("Interrupted while 
consuming " + threadId + " from kafka topic", e);
-                LOG.info("Unsubscribing {} from topic {}", threadId, 
topicName);
+                log.info("Unsubscribing {} from topic {}", threadId, 
topicName);
                 consumer.unsubscribe();
                 Thread.currentThread().interrupt();
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error consuming " + 
threadId + " from kafka topic", e);
             } finally {
-                LOG.debug("Closing {} ", threadId);
+                log.debug("Closing {} ", threadId);
                 IOHelper.close(consumer);
             }
 
             return reConnect;
         }
 
-        private void commitOffset(StateRepository<String, String> 
offsetRepository, TopicPartition partition, long partitionLastOffset) {
+        private void commitOffset(StateRepository<String, String> 
offsetRepository, TopicPartition partition, long partitionLastOffset, boolean 
forceCommit) {
             if (partitionLastOffset != -1) {
                 if (offsetRepository != null) {
                     offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(partitionLastOffset));
-                    // if autocommit is false
+                } else if (forceCommit) {
+                    log.debug("Forcing commitSync {} from topic {} with 
offset: {}", threadId, topicName, partitionLastOffset);
+                    consumer.commitSync(Collections.singletonMap(partition, 
new OffsetAndMetadata(partitionLastOffset + 1)));
                 } else if (endpoint.getConfiguration().isAutoCommitEnable() != 
null && !endpoint.getConfiguration().isAutoCommitEnable()) {
-                    LOG.debug("Auto commitSync {} from topic {} with offset: 
{}", threadId, topicName, partitionLastOffset);
+                    log.debug("Auto commitSync {} from topic {} with offset: 
{}", threadId, topicName, partitionLastOffset);
                     consumer.commitSync(Collections.singletonMap(partition, 
new OffsetAndMetadata(partitionLastOffset + 1)));
                 }
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/9945e25d/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 087d0d0..e94f799 100644
--- 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -213,7 +213,7 @@ public class KafkaComponentConfiguration {
          * poison message. Therefore its recommended to deal with that for
          * example by using Camel's error handler.
          */
-        private Boolean breakOnFirstError = true;
+        private Boolean breakOnFirstError = false;
         /**
          * URL of the Kafka brokers to use. The format is
          * host1:port1,host2:port2, and the list can be a subset of brokers or 
a

Reply via email to