cleanup

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0b6768f8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0b6768f8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0b6768f8

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 0b6768f803db12bf433d96b832c95fa228f6e7ca
Parents: f14d608
Author: Boris S <[email protected]>
Authored: Wed Sep 5 14:39:08 2018 -0700
Committer: Boris S <[email protected]>
Committed: Wed Sep 5 14:39:08 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/coordinator/JobModelManager.scala    |  2 +-
 .../kafka/clients/consumer/KafkaConsumerConfig.java       | 10 +++++-----
 .../org/apache/samza/system/kafka/KafkaConsumerProxy.java | 10 +++++-----
 .../apache/samza/system/kafka/KafkaSystemFactory.scala    |  4 ++--
 4 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0b6768f8/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index f7ffd4e..f95a521 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -64,7 +64,7 @@ object JobModelManager extends Logging {
    * a) Reads the jobModel from coordinator stream using the job's 
configuration.
    * b) Recomputes changelog partition mapping based on jobModel and job's 
configuration.
    * c) Builds JobModelManager using the jobModel read from coordinator stream.
-   * @param config Coordinator stream manager config
+   * @param config Coordinator stream manager config.
    * @param changelogPartitionMapping The changelog partition-to-task mapping.
    * @return JobModelManager
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/0b6768f8/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
index 843e03d..98792ab 100644
--- 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -129,17 +129,17 @@ public class KafkaConsumerConfig extends ConsumerConfig {
   }
 
   // client id should be unique per job
-  public static String getClientId(Config config) {
-    return getClientId(CONSUMER_CLIENT_ID_PREFIX, config);
+  public static String getConsumerClientId(Config config) {
+    return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
   }
   public static String getProducerClientId(Config config) {
-    return getClientId(PRODUCER_CLIENT_ID_PREFIX, config);
+    return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
   }
   public static String getAdminClientId(Config config) {
-    return getClientId(ADMIN_CLIENT_ID_PREFIX, config);
+    return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
   }
 
-  private static String getClientId(String id, Config config) {
+  private static String getConsumerClientId(String id, Config config) {
     if (config.get(JobConfig.JOB_NAME()) == null) {
       throw new ConfigException("Missing job name");
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/0b6768f8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 5c79017..ae80d50 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -85,7 +85,6 @@ public class KafkaConsumerProxy<K, V> {
     this.metricName = metricName;
     this.clientId = clientId;
 
-    // TODO - see if we need new metrics (not host:port based)
     this.kafkaConsumerMetrics.registerClientProxy(metricName);
 
     consumerPollThread = new Thread(createProxyThreadRunnable());
@@ -133,18 +132,17 @@ public class KafkaConsumerProxy<K, V> {
    * creates a separate thread for pulling messages
    */
   private Runnable createProxyThreadRunnable() {
-    return () -> {
+    Runnable runnable=  () -> {
       isRunning = true;
 
 
       try {
         consumerPollThreadStartLatch.countDown();
-        System.out.println("THREAD: runing " + consumerPollThread.getName());
+        LOG.info("Starting runnable " + consumerPollThread.getName());
         initializeLags();
         while (isRunning) {
           fetchMessages();
         }
-        System.out.println("THREAD: finished " + consumerPollThread.getName());
       } catch (Throwable throwable) {
         LOG.error(String.format("Error in KafkaConsumerProxy poll thread for 
system: %s.", systemName), throwable);
         // SamzaKafkaSystemConsumer uses the failureCause to propagate the 
throwable to the container
@@ -156,6 +154,8 @@ public class KafkaConsumerProxy<K, V> {
         LOG.info("Stopping the KafkaConsumerProxy poll thread for system: 
{}.", systemName);
       }
     };
+
+    return runnable;
   }
 
   private void initializeLags() {
@@ -433,7 +433,7 @@ public class KafkaConsumerProxy<K, V> {
   }
 
   public void stop(long timeout) {
-    System.out.println("THREAD: Shutting down KafkaConsumerProxy poll thread:" 
+ consumerPollThread.getName());
+    LOG.info("Shutting down KafkaConsumerProxy poll thread:" + 
consumerPollThread.getName());
 
     isRunning = false;
     try {

http://git-wip-us.apache.org/repos/asf/samza/blob/0b6768f8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 892d400..6f58bed 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -47,7 +47,7 @@ object KafkaSystemFactory extends Logging {
 class KafkaSystemFactory extends SystemFactory with Logging {
 
   def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemConsumer = {
-    val clientId = KafkaConsumerConfig.getClientId( config)
+    val clientId = KafkaConsumerConfig.getConsumerClientId( config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
     NewKafkaSystemConsumer.getNewKafkaSystemConsumer(
@@ -76,7 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
-    val clientId = KafkaConsumerConfig.getClientId(config)
+    val clientId = KafkaConsumerConfig.getConsumerClientId(config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
     val bootstrapServers = producerConfig.bootsrapServers
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)

Reply via email to