CAMEL-10594: Improve shutdown of kafka consumer to be graceful and break out the while loop during stopping.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f91d100b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f91d100b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f91d100b Branch: refs/heads/camel-2.18.x Commit: f91d100bd03cadeefeab27664e750f6505505ff5 Parents: 09b072f Author: Claus Ibsen <[email protected]> Authored: Wed Dec 14 13:07:34 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Wed Dec 14 13:07:59 2016 +0100 ---------------------------------------------------------------------- .../apache/camel/component/kafka/KafkaConsumer.java | 15 +++++++++------ .../camel/component/kafka/KafkaConsumerFullTest.java | 4 ++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 1 + 3 files changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 6cd5108..66c4335 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -68,8 +68,9 @@ public class KafkaConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { - super.doStart(); LOG.info("Starting Kafka consumer"); + super.doStart(); + executor = endpoint.createExecutor(); for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) { executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps())); @@ -78,17 +79,18 @@ public class KafkaConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { - super.doStop(); LOG.info("Stopping Kafka consumer"); if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { - getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor); } else { executor.shutdownNow(); } } executor = null; + + super.doStop(); } class KafkaFetchRecords implements Runnable { @@ -117,7 +119,7 @@ public class KafkaConsumer extends DefaultConsumer { @SuppressWarnings("unchecked") public void run() { try { - LOG.debug("Subscribing {} to topic {}", threadId, topicName); + LOG.info("Subscribing {} to topic {}", threadId, topicName); consumer.subscribe(Arrays.asList(topicName.split(","))); if (endpoint.getConfiguration().isSeekToBeginning()) { @@ -126,7 +128,7 @@ public class KafkaConsumer extends DefaultConsumer { consumer.poll(100); consumer.seekToBeginning(consumer.assignment()); } - while (isRunAllowed() && !isSuspendingOrSuspended()) { + while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs); for (TopicPartition partition : allRecords.partitions()) { List<ConsumerRecord<Object, Object>> partitionRecords = allRecords @@ -151,10 +153,11 @@ public class KafkaConsumer extends DefaultConsumer { } } } - LOG.debug("Unsubscribing {} from topic {}", threadId, topicName); + LOG.info("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); } catch (InterruptException e) { getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); + LOG.info("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); Thread.currentThread().interrupt(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java index 01c0bd1..e8e6c9e 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java @@ -76,7 +76,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { } @Test - public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException { + public void kafkaMessageIsConsumedByCamel() throws InterruptedException, IOException { to.expectedMessageCount(5); to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); for (int k = 0; k < 5; k++) { @@ -89,7 +89,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { @Test @Ignore("Currently there is a bug in kafka which leads to an uninterruptable thread so a resub take too long (works manually)") - public void kaftMessageIsConsumedByCamelSeekedToBeginning() throws Exception { + public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws Exception { to.expectedMessageCount(5); to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); for (int k = 0; k < 5; k++) { http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 1c4f0ee..d74d5e0 100644 --- a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -37,6 +37,7 @@ public class KafkaConsumerTest { public void init() { when(kafkaConsumer.poll(1000)).thenReturn(ConsumerRecords.empty()); } + @Test public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() { ConsumerRecords<Object, Object> consumerRecords = kafkaConsumer.poll(1000);
