CAMEL-11215: Add breakOnError to camel-kafka so the consumer stops on first exception and allow the same message to be polled again for retry.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/178d60af Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/178d60af Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/178d60af Branch: refs/heads/CAMEL-11215 Commit: 178d60afce0c642752b09adba8e1fef790a793d7 Parents: 1504c6e Author: Claus Ibsen <davscl...@apache.org> Authored: Mon May 15 14:30:43 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 15 14:30:43 2017 +0200 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 85 ++++++++++++++------ 1 file changed, 61 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/178d60af/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 7aa6234..19d90ac 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -122,7 +122,7 @@ public class KafkaConsumer extends DefaultConsumer { class KafkaFetchRecords implements Runnable { - private final org.apache.kafka.clients.consumer.KafkaConsumer consumer; + private org.apache.kafka.clients.consumer.KafkaConsumer consumer; private final String topicName; private final String threadId; private final Properties kafkaProps; @@ -131,20 +131,48 @@ public class KafkaConsumer extends DefaultConsumer { this.topicName = topicName; this.threadId = topicName + "-" + "Thread " + id; this.kafkaProps = kafkaProps; - - ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); - try { - // Kafka uses reflection for loading authentication settings, use its classloader - Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); - this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); - } finally { - Thread.currentThread().setContextClassLoader(threadClassLoader); - } } @Override @SuppressWarnings("unchecked") public void run() { + boolean first = true; + boolean reConnect = true; + + while (reConnect) { + + // create consumer + ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); + try { + // Kafka uses reflection for loading authentication settings, use its classloader + Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); + this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); + } finally { + Thread.currentThread().setContextClassLoader(threadClassLoader); + } + + if (!first) { + // skip one poll timeout before trying again + long delay = endpoint.getConfiguration().getPollTimeoutMs(); + log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay); + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + first = false; + + // doRun keeps running until we either shutdown or is told to re-connect + reConnect = doRun(); + } + } + + protected boolean doRun() { + // allow to re-connect thread in case we use that to retry failed messages + boolean reConnect = false; + try { LOG.info("Subscribing {} to topic {}", threadId, topicName); consumer.subscribe(Arrays.asList(topicName.split(","))); @@ -185,7 +213,8 @@ public class KafkaConsumer extends DefaultConsumer { consumer.seekToEnd(consumer.assignment()); } } - while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) { + + while (isRunAllowed() && !reConnect && !isStoppingOrStopped() && !isSuspendingOrSuspended()) { // flag to break out processing on the first exception boolean breakOnErrorHit = false; @@ -193,12 +222,13 @@ public class KafkaConsumer extends DefaultConsumer { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs); for (TopicPartition partition : allRecords.partitions()) { + + long partitionLastOffset = -1; + Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator(); if (!breakOnErrorHit && recordIterator.hasNext()) { ConsumerRecord<Object, Object> record; - long partitionLastOffset = -1; - while (!breakOnErrorHit && recordIterator.hasNext()) { record = recordIterator.next(); if (LOG.isTraceEnabled()) { @@ -209,6 +239,7 @@ public class KafkaConsumer extends DefaultConsumer { if (endpoint.getConfiguration().isAutoCommitEnable() != null && !endpoint.getConfiguration().isAutoCommitEnable()) { exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordIterator.hasNext()); } + try { processor.process(exchange); } catch (Exception e) { @@ -221,10 +252,7 @@ public class KafkaConsumer extends DefaultConsumer { // commit last good offset before we try again commitOffset(offsetRepository, partition, partitionLastOffset); // we are failing but store last good offset - log.warn("Error during processing {} from topic: {}. Will seek consumer to last good offset: {} and poll again.", exchange, topicName, partitionLastOffset); - if (partitionLastOffset != -1) { - consumer.seek(partition, partitionLastOffset); - } + log.warn("Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", exchange, topicName, partitionLastOffset); // continue to next partition breakOnErrorHit = true; } else { @@ -243,15 +271,22 @@ public class KafkaConsumer extends DefaultConsumer { } } } + + if (breakOnErrorHit) { + // force re-connect + reConnect = true; + } } - if (endpoint.getConfiguration().isAutoCommitEnable() != null && endpoint.getConfiguration().isAutoCommitEnable()) { - if ("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName); - consumer.commitAsync(); - } else if ("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName); - consumer.commitSync(); + if (!reConnect) { + if (endpoint.getConfiguration().isAutoCommitEnable() != null && endpoint.getConfiguration().isAutoCommitEnable()) { + if ("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) { + LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName); + consumer.commitAsync(); + } else if ("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) { + LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName); + consumer.commitSync(); + } } } @@ -268,6 +303,8 @@ public class KafkaConsumer extends DefaultConsumer { LOG.debug("Closing {} ", threadId); IOHelper.close(consumer); } + + return reConnect; } private void commitOffset(StateRepository<String, String> offsetRepository, TopicPartition partition, long partitionLastOffset) {