Repository: incubator-twill
Updated Branches:
  refs/heads/master b740da438 -> 323e5af4e


(TWILL-151) Improve Logging error when fetching message after Kafka server is 
stopped

This closes #65 on GitHub.

Signed-off-by: Terence Yim <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/323e5af4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/323e5af4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/323e5af4

Branch: refs/heads/master
Commit: 323e5af4e536f3b5f306a5c59cd6787bf3a99818
Parents: b740da4
Author: sanojkodikkara <[email protected]>
Authored: Wed Sep 30 16:23:08 2015 +0530
Committer: Terence Yim <[email protected]>
Committed: Wed Jan 6 10:55:36 2016 -0800

----------------------------------------------------------------------
 .../internal/kafka/client/SimpleKafkaConsumer.java   | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/323e5af4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 
b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 8cfe889..0299e56 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
+
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.PartitionOffsetRequestInfo;
@@ -39,6 +40,7 @@ import kafka.javaapi.OffsetResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
+
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.apache.twill.kafka.client.BrokerInfo;
@@ -49,6 +51,7 @@ import org.apache.twill.kafka.client.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.ConnectException;
 import java.nio.channels.ClosedByInterruptException;
 import java.util.Iterator;
 import java.util.List;
@@ -374,9 +377,15 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
           invokeCallback(messages, offset);
           backoff.reset();
         } catch (Throwable t) {
-          if (running || !(t instanceof ClosedByInterruptException)) {
-            // Only log if it is still running, otherwise, it just the 
interrupt caused by the stop.
-            LOG.info("Exception when fetching message on {}.", topicPart, t);
+          // Only log if it is still running, otherwise, it just the interrupt 
caused by the stop.
+          if (!running) {
+            LOG.debug("Unable to fetch messages on {}, kafka consumer service 
shutdown is in progress.", topicPart);
+          } else {
+            if (t instanceof ClosedByInterruptException || t instanceof 
ConnectException) {
+              LOG.debug("Unable to fetch messages on {}, kafka server shutdown 
is in progress.", topicPart);
+            } else {
+              LOG.info("Exception when fetching message on {}.", topicPart, t);
+            }
             backoff.backoff();
           }
           consumers.refresh(consumerEntry.getKey());

Reply via email to