comments

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2480aa36
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2480aa36
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2480aa36

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 2480aa36ac7afe10b931d3148ab6e41f70c778cb
Parents: 053fe3b
Author: Boris S <[email protected]>
Authored: Tue Sep 11 14:08:02 2018 -0700
Committer: Boris S <[email protected]>
Committed: Tue Sep 11 14:08:02 2018 -0700

----------------------------------------------------------------------
 .../clients/consumer/KafkaConsumerConfig.java      |  6 +++---
 .../samza/system/kafka/KafkaSystemConsumer.java    | 17 ++---------------
 .../samza/system/kafka/KafkaSystemFactory.scala    | 11 ++++++++---
 3 files changed, 13 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2480aa36/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
index 8ca5b93..1a97ec7 100644
--- 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -93,12 +93,12 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT?
     if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
       // get it from the producer config
-      String bootstrapServer =
+      String bootstrapServers =
           config.get(String.format("systems.%s.producer.%s", systemName, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-      if (StringUtils.isEmpty(bootstrapServer)) {
+      if (StringUtils.isEmpty(bootstrapServers)) {
         throw new SamzaException("Missing " + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
       }
-      consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+      consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
     }
 
     // Always use default partition assignment strategy. Do not allow override.

http://git-wip-us.apache.org/repos/asf/samza/blob/2480aa36/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 196fb85..9cdfce1 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -109,19 +109,6 @@ public class KafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements Sy
         clientId, metricName, this.kafkaConsumer.toString());
   }
 
-  public static <K, V> KafkaSystemConsumer getNewKafkaSystemConsumer(String 
systemName, Config config,
-      String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) {
-
-    // extract consumer configs and create kafka consumer
-    KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, 
clientId, config);
-    LOG.info("Created kafka consumer for system {}, clientId {}: {}", 
systemName, clientId, kafkaConsumer);
-
-    KafkaSystemConsumer kc = new KafkaSystemConsumer(kafkaConsumer, 
systemName, config, clientId, metrics, clock);
-    LOG.info("Created samza system consumer {}", kc.toString());
-
-    return kc;
-  }
-
   /**
    * create kafka consumer
    * @param systemName system name for which we create the consumer
@@ -129,7 +116,7 @@ public class KafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements Sy
    * @param config config
    * @return kafka consumer
    */
-  public static <K, V> KafkaConsumer<K, V> getKafkaConsumerImpl(String 
systemName, String clientId, Config config) {
+  public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String 
systemName, String clientId, Config config) {
 
     Map<String, String> injectProps = new HashMap<>();
 
@@ -263,7 +250,7 @@ public class KafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements Sy
     // stop the proxy (with 5 minutes timeout)
     if (proxy != null) {
       LOG.info("Stopping proxy " + proxy);
-      proxy.stop(TimeUnit.MINUTES.toMillis(5));
+      proxy.stop(TimeUnit.SECONDS.toMillis(60));
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/samza/blob/2480aa36/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index e0e85be..9f92583 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -22,7 +22,7 @@ package org.apache.samza.system.kafka
 import java.util.Properties
 
 import kafka.utils.ZkUtils
-import org.apache.kafka.clients.consumer.KafkaConsumerConfig
+import org.apache.kafka.clients.consumer.{KafkaConsumer, KafkaConsumerConfig}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
@@ -50,8 +50,13 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val clientId = KafkaConsumerConfig.getConsumerClientId( config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
-    KafkaSystemConsumer.getNewKafkaSystemConsumer(
-      systemName, config, clientId, metrics, new SystemClock)
+    val kafkaConsumer = KafkaSystemConsumer.getKafkaConsumerImpl(systemName, 
clientId, config)
+    info("Created kafka consumer for system %s, clientId %s: %s" format 
(systemName, clientId, kafkaConsumer))
+
+    val kc = new KafkaSystemConsumer(kafkaConsumer, systemName, config, 
clientId, metrics, new SystemClock)
+    info("Created samza system consumer %s" format  (kc.toString))
+
+    kc
   }
 
   def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {

Reply via email to