rename of a var
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74b6cfab Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74b6cfab Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74b6cfab Branch: refs/heads/NewKafkaSystemConsumer Commit: 74b6cfabdbb5112488965c2fc3629156e0ff8c4c Parents: ed0648d Author: Boris S <[email protected]> Authored: Tue Sep 18 14:17:58 2018 -0700 Committer: Boris S <[email protected]> Committed: Tue Sep 18 14:17:58 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/system/kafka/KafkaConsumerProxy.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/74b6cfab/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 6fc6491..b67df0a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -367,20 +367,20 @@ import org.slf4j.LoggerFactory; Using the consumer to poll the messages from the stream. */ private void fetchMessages() { - Set<SystemStreamPartition> SSPsToFetch = new HashSet<>(); + Set<SystemStreamPartition> sspsToFetch = new HashSet<>(); for (SystemStreamPartition ssp : nextOffsets.keySet()) { if (sink.needsMoreMessages(ssp)) { - SSPsToFetch.add(ssp); + sspsToFetch.add(ssp); } } - LOG.debug("pollConsumer {}", SSPsToFetch.size()); - if (!SSPsToFetch.isEmpty()) { + LOG.debug("pollConsumer {}", sspsToFetch.size()); + if (!sspsToFetch.isEmpty()) { kafkaConsumerMetrics.incClientReads(metricName); Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response; - LOG.debug("pollConsumer from following SSPs: {}; total#={}", SSPsToFetch, SSPsToFetch.size()); + LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size()); - response = pollConsumer(SSPsToFetch, 500); // TODO should be default value from ConsumerConfig + response = pollConsumer(sspsToFetch, 500); // TODO should be default value from ConsumerConfig // move the responses into the queue for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) { @@ -390,7 +390,7 @@ import org.slf4j.LoggerFactory; } } - populateCurrentLags(SSPsToFetch); // find current lags for for each SSP + populateCurrentLags(sspsToFetch); // find current lags for for each SSP } else { // nothing to read LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer,
