cleanup names
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f14d6081 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f14d6081 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f14d6081 Branch: refs/heads/NewKafkaSystemConsumer Commit: f14d6081f25f1738d0a31c9d2798f8bdd52a7c75 Parents: ceb0f6a Author: Boris S <[email protected]> Authored: Wed Sep 5 14:26:28 2018 -0700 Committer: Boris S <[email protected]> Committed: Wed Sep 5 14:26:28 2018 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/KafkaConsumerProxy.java | 16 ++++++++-------- .../samza/system/kafka/NewKafkaSystemConsumer.java | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f14d6081/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 7232a0a..5c79017 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; /** * Separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap. - * This class is not thread safe. There will be only one instance of this class per LiKafkaSystemConsumer object. + * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object. * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details. */ public class KafkaConsumerProxy<K, V> { @@ -108,7 +108,7 @@ public class KafkaConsumerProxy<K, V> { } } } else { - LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). Ignoring.", this.toString()); + LOG.debug("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString()); } } @@ -146,14 +146,14 @@ public class KafkaConsumerProxy<K, V> { } System.out.println("THREAD: finished " + consumerPollThread.getName()); } catch (Throwable throwable) { - LOG.error(String.format("Error in LiKafkaConsumerProxy poll thread for system: %s.", systemName), throwable); - // SamzaLiKafkaSystemConsumer uses the failureCause to propagate the throwable to the container + LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable); + // SamzaKafkaSystemConsumer uses the failureCause to propagate the throwable to the container failureCause = throwable; isRunning = false; } if (!isRunning) { - LOG.info("Stopping the LiKafkaConsumerProxy poll thread for system: {}.", systemName); + LOG.info("Stopping the KafkaConsumerProxy poll thread for system: {}.", systemName); } }; } @@ -318,7 +318,7 @@ public class KafkaConsumerProxy<K, V> { } /* - The only way to figure out lag for the LiKafkaConsumer is to look at the metrics after each poll() call. + The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call. One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is. This method populates the lag information for each SSP into latestLags member variable. */ @@ -335,7 +335,7 @@ public class KafkaConsumerProxy<K, V> { MetricName mn = ssp2MetricName.get(ssp); Metric currentLagM = consumerMetrics.get(mn); - // In linkedin-kafka-client 5.*, high watermark is fixed to be the offset of last available message, + // High watermark is fixed to be the offset of last available message, // so the lag is now at least 0, which is the same as Samza's definition. // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling. long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L; @@ -433,7 +433,7 @@ public class KafkaConsumerProxy<K, V> { } public void stop(long timeout) { - System.out.println("THREAD: Shutting down LiKafkaConsumerProxy poll thread:" + consumerPollThread.getName()); + System.out.println("THREAD: Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName()); isRunning = false; try { http://git-wip-us.apache.org/repos/asf/samza/blob/f14d6081/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java index 717b45d..afec8ad 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java @@ -194,7 +194,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements try { synchronized (kafkaConsumer) { // TODO in the future we may need to add special handling here for BEGIN/END_OFFSET - // this will call liKafkaConsumer.seekToBegin/End() + // this will call KafkaConsumer.seekToBegin/End() kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value } } catch (Exception e) { @@ -274,7 +274,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements kafkaConsumer.close(); } } catch (Exception e) { - LOG.warn("failed to stop SamzaRawLiKafkaConsumer + " + this, e); + LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e); } } @@ -340,7 +340,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements throw new SamzaException(message, proxy.getFailureCause()); } else { LOG.warn("Failure cause is not populated for KafkaConsumerProxy"); - throw new SamzaException("LiKafkaConsumerProxy has stopped"); + throw new SamzaException("KafkaConsumerProxy has stopped"); } }
