debug
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ddada94d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ddada94d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ddada94d Branch: refs/heads/NewKafkaSystemConsumer Commit: ddada94d09a8ac78ec7a88eff9dc77cd39dba32d Parents: 2655221 Author: Boris S <[email protected]> Authored: Mon Sep 10 16:28:12 2018 -0700 Committer: Boris S <[email protected]> Committed: Mon Sep 10 16:28:12 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/KafkaConsumerProxy.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ddada94d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 0825c90..92f9183 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -100,7 +100,7 @@ public class KafkaConsumerProxy<K, V> { consumerPollThread.start(); // we need to wait until the thread starts - while (!isRunning) { + while (!isRunning && failureCause == null) { try { consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -378,9 +378,8 @@ public class KafkaConsumerProxy<K, V> { kafkaConsumerMetrics.incClientReads(metricName); Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response; - if (LOG.isDebugEnabled()) { - LOG.debug("pollConsumer from following SSPs: {}; total#={}", SSPsToFetch, SSPsToFetch.size()); - } + LOG.debug("pollConsumer from following SSPs: {}; total#={}", SSPsToFetch, SSPsToFetch.size()); + response = pollConsumer(SSPsToFetch, 500); // TODO should be default value from ConsumerConfig // move the responses into the queue
