addressed comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ed0648dc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ed0648dc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ed0648dc Branch: refs/heads/NewKafkaSystemConsumer Commit: ed0648dca2b2a902875073861a433238d84ce68f Parents: 5120740 Author: Boris S <[email protected]> Authored: Tue Sep 18 13:12:14 2018 -0700 Committer: Boris S <[email protected]> Committed: Tue Sep 18 13:12:14 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/KafkaConsumerProxy.java | 2 +- .../org/apache/samza/system/kafka/KafkaSystemFactory.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ed0648dc/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 83e7a58..6fc6491 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object. * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details. */ -public class KafkaConsumerProxy<K, V> { +/*package private */class KafkaConsumerProxy<K, V> { private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class); private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100; http://git-wip-us.apache.org/repos/asf/samza/blob/ed0648dc/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 9f92583..5342b08 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -53,10 +53,10 @@ class KafkaSystemFactory extends SystemFactory with Logging { val kafkaConsumer = KafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config) info("Created kafka consumer for system %s, clientId %s: %s" format (systemName, clientId, kafkaConsumer)) - val kc = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, new SystemClock) - info("Created samza system consumer %s" format (kc.toString)) + val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, new SystemClock) + info("Created samza system consumer %s" format (kafkaSystemConsumer.toString)) - kc + kafkaSystemConsumer } def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
