debug

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ddada94d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ddada94d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ddada94d

Branch: refs/heads/NewKafkaSystemConsumer
Commit: ddada94d09a8ac78ec7a88eff9dc77cd39dba32d
Parents: 2655221
Author: Boris S <[email protected]>
Authored: Mon Sep 10 16:28:12 2018 -0700
Committer: Boris S <[email protected]>
Committed: Mon Sep 10 16:28:12 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/KafkaConsumerProxy.java     | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ddada94d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 0825c90..92f9183 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -100,7 +100,7 @@ public class KafkaConsumerProxy<K, V> {
       consumerPollThread.start();
 
       // we need to wait until the thread starts
-      while (!isRunning) {
+      while (!isRunning && failureCause == null) {
         try {
           consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
@@ -378,9 +378,8 @@ public class KafkaConsumerProxy<K, V> {
       kafkaConsumerMetrics.incClientReads(metricName);
 
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("pollConsumer from following SSPs: {}; total#={}", 
SSPsToFetch, SSPsToFetch.size());
-      }
+      LOG.debug("pollConsumer from following SSPs: {}; total#={}", 
SSPsToFetch, SSPsToFetch.size());
+
       response = pollConsumer(SSPsToFetch, 500); // TODO should be default 
value from ConsumerConfig
 
       // move the responses into the queue

Reply via email to