cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0b6768f8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0b6768f8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0b6768f8 Branch: refs/heads/NewKafkaSystemConsumer Commit: 0b6768f803db12bf433d96b832c95fa228f6e7ca Parents: f14d608 Author: Boris S <[email protected]> Authored: Wed Sep 5 14:39:08 2018 -0700 Committer: Boris S <[email protected]> Committed: Wed Sep 5 14:39:08 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/coordinator/JobModelManager.scala | 2 +- .../kafka/clients/consumer/KafkaConsumerConfig.java | 10 +++++----- .../org/apache/samza/system/kafka/KafkaConsumerProxy.java | 10 +++++----- .../apache/samza/system/kafka/KafkaSystemFactory.scala | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0b6768f8/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index f7ffd4e..f95a521 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -64,7 +64,7 @@ object JobModelManager extends Logging { * a) Reads the jobModel from coordinator stream using the job's configuration. * b) Recomputes changelog partition mapping based on jobModel and job's configuration. * c) Builds JobModelManager using the jobModel read from coordinator stream. - * @param config Coordinator stream manager config + * @param config Coordinator stream manager config. * @param changelogPartitionMapping The changelog partition-to-task mapping. * @return JobModelManager */ http://git-wip-us.apache.org/repos/asf/samza/blob/0b6768f8/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java index 843e03d..98792ab 100644 --- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java @@ -129,17 +129,17 @@ public class KafkaConsumerConfig extends ConsumerConfig { } // client id should be unique per job - public static String getClientId(Config config) { - return getClientId(CONSUMER_CLIENT_ID_PREFIX, config); + public static String getConsumerClientId(Config config) { + return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config); } public static String getProducerClientId(Config config) { - return getClientId(PRODUCER_CLIENT_ID_PREFIX, config); + return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config); } public static String getAdminClientId(Config config) { - return getClientId(ADMIN_CLIENT_ID_PREFIX, config); + return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config); } - private static String getClientId(String id, Config config) { + private static String getConsumerClientId(String id, Config config) { if (config.get(JobConfig.JOB_NAME()) == null) { throw new ConfigException("Missing job name"); } http://git-wip-us.apache.org/repos/asf/samza/blob/0b6768f8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 5c79017..ae80d50 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -85,7 +85,6 @@ public class KafkaConsumerProxy<K, V> { this.metricName = metricName; this.clientId = clientId; - // TODO - see if we need new metrics (not host:port based) this.kafkaConsumerMetrics.registerClientProxy(metricName); consumerPollThread = new Thread(createProxyThreadRunnable()); @@ -133,18 +132,17 @@ public class KafkaConsumerProxy<K, V> { * creates a separate thread for pulling messages */ private Runnable createProxyThreadRunnable() { - return () -> { + Runnable runnable= () -> { isRunning = true; try { consumerPollThreadStartLatch.countDown(); - System.out.println("THREAD: runing " + consumerPollThread.getName()); + LOG.info("Starting runnable " + consumerPollThread.getName()); initializeLags(); while (isRunning) { fetchMessages(); } - System.out.println("THREAD: finished " + consumerPollThread.getName()); } catch (Throwable throwable) { LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable); // SamzaKafkaSystemConsumer uses the failureCause to propagate the throwable to the container @@ -156,6 +154,8 @@ public class KafkaConsumerProxy<K, V> { LOG.info("Stopping the KafkaConsumerProxy poll thread for system: {}.", systemName); } }; + + return runnable; } private void initializeLags() { @@ -433,7 +433,7 @@ public class KafkaConsumerProxy<K, V> { } public void stop(long timeout) { - System.out.println("THREAD: Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName()); + LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName()); isRunning = false; try { http://git-wip-us.apache.org/repos/asf/samza/blob/0b6768f8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 892d400..6f58bed 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -47,7 +47,7 @@ object KafkaSystemFactory extends Logging { class KafkaSystemFactory extends SystemFactory with Logging { def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = { - val clientId = KafkaConsumerConfig.getClientId( config) + val clientId = KafkaConsumerConfig.getConsumerClientId( config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) NewKafkaSystemConsumer.getNewKafkaSystemConsumer( @@ -76,7 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { } def getAdmin(systemName: String, config: Config): SystemAdmin = { - val clientId = KafkaConsumerConfig.getClientId(config) + val clientId = KafkaConsumerConfig.getConsumerClientId(config) val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) val bootstrapServers = producerConfig.bootsrapServers val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
