cleanup names

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f14d6081
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f14d6081
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f14d6081

Branch: refs/heads/NewKafkaSystemConsumer
Commit: f14d6081f25f1738d0a31c9d2798f8bdd52a7c75
Parents: ceb0f6a
Author: Boris S <[email protected]>
Authored: Wed Sep 5 14:26:28 2018 -0700
Committer: Boris S <[email protected]>
Committed: Wed Sep 5 14:26:28 2018 -0700

----------------------------------------------------------------------
 .../samza/system/kafka/KafkaConsumerProxy.java      | 16 ++++++++--------
 .../samza/system/kafka/NewKafkaSystemConsumer.java  |  6 +++---
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f14d6081/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 7232a0a..5c79017 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Separate thread that reads messages from kafka and puts them into the 
BlockingEnvelopeMap.
- * This class is not thread safe. There will be only one instance of this 
class per LiKafkaSystemConsumer object.
+ * This class is not thread safe. There will be only one instance of this 
class per KafkaSystemConsumer object.
  * We still need some synchronization around kafkaConsumer. See pollConsumer() 
method for details.
  */
 public class KafkaConsumerProxy<K, V> {
@@ -108,7 +108,7 @@ public class KafkaConsumerProxy<K, V> {
         }
       }
     } else {
-      LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). 
Ignoring.", this.toString());
+      LOG.debug("Tried to start an already started KafkaConsumerProxy (%s). 
Ignoring.", this.toString());
     }
   }
 
@@ -146,14 +146,14 @@ public class KafkaConsumerProxy<K, V> {
         }
         System.out.println("THREAD: finished " + consumerPollThread.getName());
       } catch (Throwable throwable) {
-        LOG.error(String.format("Error in LiKafkaConsumerProxy poll thread for 
system: %s.", systemName), throwable);
-        // SamzaLiKafkaSystemConsumer uses the failureCause to propagate the 
throwable to the container
+        LOG.error(String.format("Error in KafkaConsumerProxy poll thread for 
system: %s.", systemName), throwable);
+        // SamzaKafkaSystemConsumer uses the failureCause to propagate the 
throwable to the container
         failureCause = throwable;
         isRunning = false;
       }
 
       if (!isRunning) {
-        LOG.info("Stopping the LiKafkaConsumerProxy poll thread for system: 
{}.", systemName);
+        LOG.info("Stopping the KafkaConsumerProxy poll thread for system: 
{}.", systemName);
       }
     };
   }
@@ -318,7 +318,7 @@ public class KafkaConsumerProxy<K, V> {
   }
 
   /*
-    The only way to figure out lag for the LiKafkaConsumer is to look at the 
metrics after each poll() call.
+    The only way to figure out lag for the KafkaConsumer is to look at the 
metrics after each poll() call.
     One of the metrics (records-lag) shows how far behind the HighWatermark 
the consumer is.
     This method populates the lag information for each SSP into latestLags 
member variable.
    */
@@ -335,7 +335,7 @@ public class KafkaConsumerProxy<K, V> {
       MetricName mn = ssp2MetricName.get(ssp);
       Metric currentLagM = consumerMetrics.get(mn);
 
-      // In linkedin-kafka-client 5.*, high watermark is fixed to be the 
offset of last available message,
+      // High watermark is fixed to be the offset of last available message,
       // so the lag is now at least 0, which is the same as Samza's definition.
       // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps 
polling.
       long currentLag = (currentLagM != null) ? (long) currentLagM.value() : 
-1L;
@@ -433,7 +433,7 @@ public class KafkaConsumerProxy<K, V> {
   }
 
   public void stop(long timeout) {
-    System.out.println("THREAD: Shutting down LiKafkaConsumerProxy poll 
thread:" + consumerPollThread.getName());
+    System.out.println("THREAD: Shutting down KafkaConsumerProxy poll thread:" 
+ consumerPollThread.getName());
 
     isRunning = false;
     try {

http://git-wip-us.apache.org/repos/asf/samza/blob/f14d6081/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
index 717b45d..afec8ad 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -194,7 +194,7 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
       try {
         synchronized (kafkaConsumer) {
           // TODO in the future we may need to add special handling here for 
BEGIN/END_OFFSET
-          // this will call liKafkaConsumer.seekToBegin/End()
+          // this will call KafkaConsumer.seekToBegin/End()
           kafkaConsumer.seek(tp, startingOffset); // this value should already 
be the 'upcoming' value
         }
       } catch (Exception e) {
@@ -274,7 +274,7 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
         kafkaConsumer.close();
       }
     } catch (Exception e) {
-      LOG.warn("failed to stop SamzaRawLiKafkaConsumer + " + this, e);
+      LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e);
     }
   }
 
@@ -340,7 +340,7 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
         throw new SamzaException(message, proxy.getFailureCause());
       } else {
         LOG.warn("Failure cause is not populated for KafkaConsumerProxy");
-        throw new SamzaException("LiKafkaConsumerProxy has stopped");
+        throw new SamzaException("KafkaConsumerProxy has stopped");
       }
     }
 

Reply via email to