(TWILL-151) Improve Logging error when fetching message after Kafka server is stopped
This closes #65 on GitHub. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/323e5af4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/323e5af4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/323e5af4 Branch: refs/heads/site Commit: 323e5af4e536f3b5f306a5c59cd6787bf3a99818 Parents: b740da4 Author: sanojkodikkara <[email protected]> Authored: Wed Sep 30 16:23:08 2015 +0530 Committer: Terence Yim <[email protected]> Committed: Wed Jan 6 10:55:36 2016 -0800 ---------------------------------------------------------------------- .../internal/kafka/client/SimpleKafkaConsumer.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/323e5af4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java index 8cfe889..0299e56 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; + import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; @@ -39,6 +40,7 @@ import kafka.javaapi.OffsetResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; + import org.apache.twill.common.Cancellable; import org.apache.twill.common.Threads; import org.apache.twill.kafka.client.BrokerInfo; @@ -49,6 +51,7 @@ import org.apache.twill.kafka.client.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.ConnectException; import java.nio.channels.ClosedByInterruptException; import java.util.Iterator; import java.util.List; @@ -374,9 +377,15 @@ final class SimpleKafkaConsumer implements KafkaConsumer { invokeCallback(messages, offset); backoff.reset(); } catch (Throwable t) { - if (running || !(t instanceof ClosedByInterruptException)) { - // Only log if it is still running, otherwise, it just the interrupt caused by the stop. - LOG.info("Exception when fetching message on {}.", topicPart, t); + // Only log if it is still running, otherwise, it just the interrupt caused by the stop. + if (!running) { + LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart); + } else { + if (t instanceof ClosedByInterruptException || t instanceof ConnectException) { + LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart); + } else { + LOG.info("Exception when fetching message on {}.", topicPart, t); + } backoff.backoff(); } consumers.refresh(consumerEntry.getKey());
