comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2480aa36 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2480aa36 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2480aa36 Branch: refs/heads/NewKafkaSystemConsumer Commit: 2480aa36ac7afe10b931d3148ab6e41f70c778cb Parents: 053fe3b Author: Boris S <[email protected]> Authored: Tue Sep 11 14:08:02 2018 -0700 Committer: Boris S <[email protected]> Committed: Tue Sep 11 14:08:02 2018 -0700 ---------------------------------------------------------------------- .../clients/consumer/KafkaConsumerConfig.java | 6 +++--- .../samza/system/kafka/KafkaSystemConsumer.java | 17 ++--------------- .../samza/system/kafka/KafkaSystemFactory.scala | 11 ++++++++--- 3 files changed, 13 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2480aa36/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java index 8ca5b93..1a97ec7 100644 --- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java @@ -93,12 +93,12 @@ public class KafkaConsumerConfig extends ConsumerConfig { // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT? if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { // get it from the producer config - String bootstrapServer = + String bootstrapServers = config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - if (StringUtils.isEmpty(bootstrapServer)) { + if (StringUtils.isEmpty(bootstrapServers)) { throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName); } - consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } // Always use default partition assignment strategy. Do not allow override. http://git-wip-us.apache.org/repos/asf/samza/blob/2480aa36/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java index 196fb85..9cdfce1 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -109,19 +109,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy clientId, metricName, this.kafkaConsumer.toString()); } - public static <K, V> KafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config, - String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { - - // extract consumer configs and create kafka consumer - KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config); - LOG.info("Created kafka consumer for system {}, clientId {}: {}", systemName, clientId, kafkaConsumer); - - KafkaSystemConsumer kc = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock); - LOG.info("Created samza system consumer {}", kc.toString()); - - return kc; - } - /** * create kafka consumer * @param systemName system name for which we create the consumer @@ -129,7 +116,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy * @param config config * @return kafka consumer */ - public static <K, V> KafkaConsumer<K, V> getKafkaConsumerImpl(String systemName, String clientId, Config config) { + public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) { Map<String, String> injectProps = new HashMap<>(); @@ -263,7 +250,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy // stop the proxy (with 5 minutes timeout) if (proxy != null) { LOG.info("Stopping proxy " + proxy); - proxy.stop(TimeUnit.MINUTES.toMillis(5)); + proxy.stop(TimeUnit.SECONDS.toMillis(60)); } try { http://git-wip-us.apache.org/repos/asf/samza/blob/2480aa36/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index e0e85be..9f92583 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -22,7 +22,7 @@ package org.apache.samza.system.kafka import java.util.Properties import kafka.utils.ZkUtils -import org.apache.kafka.clients.consumer.KafkaConsumerConfig +import org.apache.kafka.clients.consumer.{KafkaConsumer, KafkaConsumerConfig} import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode @@ -50,8 +50,13 @@ class KafkaSystemFactory extends SystemFactory with Logging { val clientId = KafkaConsumerConfig.getConsumerClientId( config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) - KafkaSystemConsumer.getNewKafkaSystemConsumer( - systemName, config, clientId, metrics, new SystemClock) + val kafkaConsumer = KafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config) + info("Created kafka consumer for system %s, clientId %s: %s" format (systemName, clientId, kafkaConsumer)) + + val kc = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, new SystemClock) + info("Created samza system consumer %s" format (kc.toString)) + + kc } def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
