CAMEL-11215: Add breakOnError to camel-kafka so the consumer stops on first 
exception and allow the same message to be polled again for retry.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/178d60af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/178d60af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/178d60af

Branch: refs/heads/CAMEL-11215
Commit: 178d60afce0c642752b09adba8e1fef790a793d7
Parents: 1504c6e
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon May 15 14:30:43 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon May 15 14:30:43 2017 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 85 ++++++++++++++------
 1 file changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/178d60af/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 7aa6234..19d90ac 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -122,7 +122,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
     class KafkaFetchRecords implements Runnable {
 
-        private final org.apache.kafka.clients.consumer.KafkaConsumer consumer;
+        private org.apache.kafka.clients.consumer.KafkaConsumer consumer;
         private final String topicName;
         private final String threadId;
         private final Properties kafkaProps;
@@ -131,20 +131,48 @@ public class KafkaConsumer extends DefaultConsumer {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;
-
-            ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
-            try {
-                // Kafka uses reflection for loading authentication settings, 
use its classloader
-                
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
-                this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
-            } finally {
-                
Thread.currentThread().setContextClassLoader(threadClassLoader);
-            }
         }
 
         @Override
         @SuppressWarnings("unchecked")
         public void run() {
+            boolean first = true;
+            boolean reConnect = true;
+
+            while (reConnect) {
+
+                // create consumer
+                ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
+                try {
+                    // Kafka uses reflection for loading authentication 
settings, use its classloader
+                    
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
+                    this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+                } finally {
+                    
Thread.currentThread().setContextClassLoader(threadClassLoader);
+                }
+
+                if (!first) {
+                    // skip one poll timeout before trying again
+                    long delay = 
endpoint.getConfiguration().getPollTimeoutMs();
+                    log.info("Reconnecting {} to topic {} after {} ms", 
threadId, topicName, delay);
+                    try {
+                        Thread.sleep(delay);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+
+                first = false;
+
+                // doRun keeps running until we either shutdown or is told to 
re-connect
+                reConnect = doRun();
+            }
+        }
+
+        protected boolean doRun() {
+            // allow to re-connect thread in case we use that to retry failed 
messages
+            boolean reConnect = false;
+
             try {
                 LOG.info("Subscribing {} to topic {}", threadId, topicName);
                 consumer.subscribe(Arrays.asList(topicName.split(",")));
@@ -185,7 +213,8 @@ public class KafkaConsumer extends DefaultConsumer {
                         consumer.seekToEnd(consumer.assignment());
                     }
                 }
-                while (isRunAllowed() && !isStoppingOrStopped() && 
!isSuspendingOrSuspended()) {
+
+                while (isRunAllowed() && !reConnect && !isStoppingOrStopped() 
&& !isSuspendingOrSuspended()) {
 
                     // flag to break out processing on the first exception
                     boolean breakOnErrorHit = false;
@@ -193,12 +222,13 @@ public class KafkaConsumer extends DefaultConsumer {
                     ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollTimeoutMs);
 
                     for (TopicPartition partition : allRecords.partitions()) {
+
+                        long partitionLastOffset = -1;
+
                         Iterator<ConsumerRecord<Object, Object>> 
recordIterator = allRecords.records(partition).iterator();
                         if (!breakOnErrorHit && recordIterator.hasNext()) {
                             ConsumerRecord<Object, Object> record;
 
-                            long partitionLastOffset = -1;
-
                             while (!breakOnErrorHit && 
recordIterator.hasNext()) {
                                 record = recordIterator.next();
                                 if (LOG.isTraceEnabled()) {
@@ -209,6 +239,7 @@ public class KafkaConsumer extends DefaultConsumer {
                                 if 
(endpoint.getConfiguration().isAutoCommitEnable() != null && 
!endpoint.getConfiguration().isAutoCommitEnable()) {
                                     
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
!recordIterator.hasNext());
                                 }
+
                                 try {
                                     processor.process(exchange);
                                 } catch (Exception e) {
@@ -221,10 +252,7 @@ public class KafkaConsumer extends DefaultConsumer {
                                         // commit last good offset before we 
try again
                                         commitOffset(offsetRepository, 
partition, partitionLastOffset);
                                         // we are failing but store last good 
offset
-                                        log.warn("Error during processing {} 
from topic: {}. Will seek consumer to last good offset: {} and poll again.", 
exchange, topicName, partitionLastOffset);
-                                        if (partitionLastOffset != -1) {
-                                            consumer.seek(partition, 
partitionLastOffset);
-                                        }
+                                        log.warn("Error during processing {} 
from topic: {}. Will seek consumer to offset: {} and re-connect and start 
polling again.", exchange, topicName, partitionLastOffset);
                                         // continue to next partition
                                         breakOnErrorHit = true;
                                     } else {
@@ -243,15 +271,22 @@ public class KafkaConsumer extends DefaultConsumer {
                             }
                         }
                     }
+
+                    if (breakOnErrorHit) {
+                        // force re-connect
+                        reConnect = true;
+                    }
                 }
 
-                if (endpoint.getConfiguration().isAutoCommitEnable() != null 
&& endpoint.getConfiguration().isAutoCommitEnable()) {
-                    if 
("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
-                        LOG.info("Auto commitAsync on stop {} from topic {}", 
threadId, topicName);
-                        consumer.commitAsync();
-                    } else if 
("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
-                        LOG.info("Auto commitSync on stop {} from topic {}", 
threadId, topicName);
-                        consumer.commitSync();
+                if (!reConnect) {
+                    if (endpoint.getConfiguration().isAutoCommitEnable() != 
null && endpoint.getConfiguration().isAutoCommitEnable()) {
+                        if 
("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
+                            LOG.info("Auto commitAsync on stop {} from topic 
{}", threadId, topicName);
+                            consumer.commitAsync();
+                        } else if 
("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
+                            LOG.info("Auto commitSync on stop {} from topic 
{}", threadId, topicName);
+                            consumer.commitSync();
+                        }
                     }
                 }
 
@@ -268,6 +303,8 @@ public class KafkaConsumer extends DefaultConsumer {
                 LOG.debug("Closing {} ", threadId);
                 IOHelper.close(consumer);
             }
+
+            return reConnect;
         }
 
         private void commitOffset(StateRepository<String, String> 
offsetRepository, TopicPartition partition, long partitionLastOffset) {

Reply via email to