addessed some review comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/26552213 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/26552213 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/26552213 Branch: refs/heads/NewKafkaSystemConsumer Commit: 2655221348304507e1a91e6fa93ef2dc79a4620d Parents: 9217644 Author: Boris S <[email protected]> Authored: Mon Sep 10 11:17:18 2018 -0700 Committer: Boris S <[email protected]> Committed: Mon Sep 10 11:17:18 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 2 +- .../samza/coordinator/JobModelManager.scala | 3 +- .../clients/consumer/KafkaConsumerConfig.java | 43 +- .../samza/system/kafka/KafkaConsumerProxy.java | 50 +-- .../samza/system/kafka/KafkaSystemConsumer.java | 406 ++++++++++++++++++ .../samza/system/kafka/KafkaSystemFactory.scala | 4 +- .../system/kafka/NewKafkaSystemConsumer.java | 412 ------------------- .../system/kafka/TestKafkaSystemConsumer.java | 224 ++++++++++ .../kafka/TestNewKafkaSystemConsumer.java | 224 ---------- 9 files changed, 687 insertions(+), 681 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index b17788f..5ee9206 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -829,7 +829,7 @@ class SamzaContainer( } try { - info("Shutting down Samza.") + info("Shutting down SamzaContaier.") removeShutdownHook jmxServer.stop http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index f95a521..e626d9a 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -35,7 +35,6 @@ import org.apache.samza.container.LocalityManager import org.apache.samza.container.TaskName import org.apache.samza.coordinator.server.HttpServer import org.apache.samza.coordinator.server.JobServlet -import org.apache.samza.coordinator.stream.CoordinatorStreamManager import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskModel import org.apache.samza.metrics.MetricsRegistryMap @@ -64,7 +63,7 @@ object JobModelManager extends Logging { * a) Reads the jobModel from coordinator stream using the job's configuration. * b) Recomputes changelog partition mapping based on jobModel and job's configuration. * c) Builds JobModelManager using the jobModel read from coordinator stream. - * @param config Coordinator stream manager config. + * @param config Config from the coordinator stream. * @param changelogPartitionMapping The changelog partition-to-task mapping. * @return JobModelManager */ http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java index 98792ab..8ca5b93 100644 --- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java @@ -54,21 +54,28 @@ public class KafkaConsumerConfig extends ConsumerConfig { * By default, KafkaConsumer will fetch ALL available messages for all the partitions. * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). */ - private static final String KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT = "100"; + private static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100"; - - public KafkaConsumerConfig(Properties props) { + private KafkaConsumerConfig(Properties props) { super(props); } + /** + * Create kafka consumer configs, based on the subset of global configs. + * @param config + * @param systemName + * @param clientId + * @param injectProps + * @return KafkaConsumerConfig + */ public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId, Map<String, String> injectProps) { - Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); + final Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); - String groupId = getConsumerGroupId(config); + final String groupId = getConsumerGroupId(config); - Properties consumerProps = new Properties(); + final Properties consumerProps = new Properties(); consumerProps.putAll(subConf); consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); @@ -109,8 +116,8 @@ public class KafkaConsumerConfig extends ConsumerConfig { } // NOT SURE THIS IS NEEDED TODO - String maxPollRecords = - subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT); + final String maxPollRecords = + subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // put overrides @@ -120,7 +127,7 @@ public class KafkaConsumerConfig extends ConsumerConfig { } // group id should be unique per job - static String getConsumerGroupId(Config config) { + private static String getConsumerGroupId(Config config) { JobConfig jobConfig = new JobConfig(config); Option<String> jobIdOption = jobConfig.getJobId(); Option<String> jobNameOption = jobConfig.getName(); @@ -151,11 +158,12 @@ public class KafkaConsumerConfig extends ConsumerConfig { } /** - * Settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset) - need to convert + * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset), + * then need to convert them (see kafka.apache.org/documentation): * "largest" -> "latest" * "smallest" -> "earliest" - * "none" -> "none" - * "none" - will fail the kafka consumer, if offset is out of range + * + * If no setting specified we return "latest" (same as Kafka). * @param properties All consumer related {@link Properties} parsed from samza config * @return String representing the config value for "auto.offset.reset" property */ @@ -168,13 +176,18 @@ public class KafkaConsumerConfig extends ConsumerConfig { return autoOffsetReset; } + String newAutoOffsetReset; switch (autoOffsetReset) { case SAMZA_OFFSET_LARGEST: - return KAFKA_OFFSET_LATEST; + newAutoOffsetReset = KAFKA_OFFSET_LATEST; + break; case SAMZA_OFFSET_SMALLEST: - return KAFKA_OFFSET_EARLIEST; + newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; + break; default: - return KAFKA_OFFSET_LATEST; + newAutoOffsetReset = KAFKA_OFFSET_LATEST; } + LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset); + return newAutoOffsetReset; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index ae80d50..0825c90 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -22,7 +22,6 @@ package org.apache.samza.system.kafka; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -40,6 +39,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; @@ -58,13 +58,13 @@ public class KafkaConsumerProxy<K, V> { /* package private */ final Thread consumerPollThread; private final Consumer<K, V> kafkaConsumer; - private final NewKafkaSystemConsumer.KafkaConsumerMessageSink sink; + private final KafkaSystemConsumer.KafkaConsumerMessageSink sink; private final KafkaSystemConsumerMetrics kafkaConsumerMetrics; private final String metricName; private final String systemName; private final String clientId; private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); - private final Map<SystemStreamPartition, MetricName> ssp2MetricName = new HashMap<>(); + private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = new HashMap<>(); // list of all the SSPs we poll from, with their next offsets correspondingly. private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>(); // lags behind the high water mark, as reported by the Kafka consumer. @@ -75,7 +75,7 @@ public class KafkaConsumerProxy<K, V> { private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1); public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId, - NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics, + KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics, String metricName) { this.kafkaConsumer = kafkaConsumer; @@ -88,14 +88,15 @@ public class KafkaConsumerProxy<K, V> { this.kafkaConsumerMetrics.registerClientProxy(metricName); consumerPollThread = new Thread(createProxyThreadRunnable()); + consumerPollThread.setDaemon(true); + consumerPollThread.setName( + "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); } public void start() { if (!consumerPollThread.isAlive()) { LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString()); - consumerPollThread.setDaemon(true); - consumerPollThread.setName( - "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); + consumerPollThread.start(); // we need to wait until the thread starts @@ -116,7 +117,7 @@ public class KafkaConsumerProxy<K, V> { public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) { LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset, this)); - topicPartitions2SSP.put(NewKafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs + topicPartitions2SSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs // this is already vetted offset so there is no need to validate it LOG.info(String.format("Got offset %s for new topic and partition %s.", nextOffset, ssp)); @@ -135,7 +136,6 @@ public class KafkaConsumerProxy<K, V> { Runnable runnable= () -> { isRunning = true; - try { consumerPollThreadStartLatch.countDown(); LOG.info("Starting runnable " + consumerPollThread.getName()); @@ -230,19 +230,19 @@ public class KafkaConsumerProxy<K, V> { private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) { if (records == null) { - return Collections.emptyMap(); + throw new SamzaException("processResults is called with null object for records"); } int capacity = (int) (records.count() / 0.75 + 1); // to avoid rehash, allocate more then 75% of expected capacity. Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(capacity); // Parse the returned records and convert them into the IncomingMessageEnvelope. // Note. They have been already de-serialized by the consumer. - for (ConsumerRecord<K, V> r : records) { - int partition = r.partition(); - String topic = r.topic(); + for (ConsumerRecord<K, V> record : records) { + int partition = record.partition(); + String topic = record.topic(); TopicPartition tp = new TopicPartition(topic, partition); - updateMetrics(r, tp); + updateMetrics(record, tp); SystemStreamPartition ssp = topicPartitions2SSP.get(tp); List<IncomingMessageEnvelope> listMsgs = results.get(ssp); @@ -251,10 +251,10 @@ public class KafkaConsumerProxy<K, V> { results.put(ssp, listMsgs); } - final K key = r.key(); - final Object value = r.value(); - IncomingMessageEnvelope imEnvelope = - new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, value, getRecordSize(r)); + final K key = record.key(); + final Object value = record.value(); + final IncomingMessageEnvelope imEnvelope = + new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record)); listMsgs.add(imEnvelope); } if (LOG.isDebugEnabled()) { @@ -274,8 +274,8 @@ public class KafkaConsumerProxy<K, V> { } private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) { - TopicAndPartition tap = NewKafkaSystemConsumer.toTopicAndPartition(tp); - SystemStreamPartition ssp = NewKafkaSystemConsumer.toSystemStreamPartition(systemName, tap); + TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp); + SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition())); long currentSSPLag = getLatestLag(ssp); // lag between the current offset and the highwatermark if (currentSSPLag < 0) { return; @@ -312,8 +312,8 @@ public class KafkaConsumerProxy<K, V> { tags.put("client-id", clientId);// this is required by the KafkaConsumer to get the metrics for (SystemStreamPartition ssp : ssps) { - TopicPartition tp = NewKafkaSystemConsumer.toTopicPartition(ssp); - ssp2MetricName.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)); + TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp); + perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)); } } @@ -327,12 +327,12 @@ public class KafkaConsumerProxy<K, V> { Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics(); // populate the MetricNames first time - if (ssp2MetricName.isEmpty()) { + if (perPartitionMetrics.isEmpty()) { populateMetricNames(ssps); } for (SystemStreamPartition ssp : ssps) { - MetricName mn = ssp2MetricName.get(ssp); + MetricName mn = perPartitionMetrics.get(ssp); Metric currentLagM = consumerMetrics.get(mn); // High watermark is fixed to be the offset of last available message, @@ -412,7 +412,7 @@ public class KafkaConsumerProxy<K, V> { for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) { SystemStreamPartition ssp = e.getKey(); Long offset = e.getValue(); - TopicAndPartition tp = NewKafkaSystemConsumer.toTopicAndPartition(ssp); + TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); Long lag = latestLags.get(ssp); LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag); if (lag != null && offset != null && lag >= 0) { http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java new file mode 100644 index 0000000..196fb85 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -0,0 +1,406 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import kafka.common.TopicAndPartition; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlockingEnvelopeMap; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + + +public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class); + + private static final long FETCH_THRESHOLD = 50000; + private static final long FETCH_THRESHOLD_BYTES = -1L; + + private final Consumer<K, V> kafkaConsumer; + private final String systemName; + private final KafkaSystemConsumerMetrics samzaConsumerMetrics; + private final String clientId; + private final String metricName; + private final AtomicBoolean stopped = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); + private final Config config; + private final boolean fetchThresholdBytesEnabled; + + // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. + /* package private */final KafkaConsumerMessageSink messageSink; + + // proxy is doing the actual reading + final private KafkaConsumerProxy proxy; + + /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>(); + /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); + + /* package private */ long perPartitionFetchThreshold; + /* package private */ long perPartitionFetchThresholdBytes; + + /** + * Constructor + * @param systemName system name for which we create the consumer + * @param config config + * @param metrics metrics + * @param clock - system clock + */ + public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId, + KafkaSystemConsumerMetrics metrics, Clock clock) { + + super(metrics.registry(), clock, metrics.getClass().getName()); + + this.kafkaConsumer = kafkaConsumer; + this.samzaConsumerMetrics = metrics; + this.clientId = clientId; + this.systemName = systemName; + this.config = config; + this.metricName = String.format("%s %s", systemName, clientId); + + this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); + + // create a sink for passing the messages between the proxy and the consumer + messageSink = new KafkaConsumerMessageSink(); + + // Create the proxy to do the actual message reading. It is a separate thread that reads the messages from the stream + // and puts them into the sink. + proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName); + LOG.info("Created consumer proxy: " + proxy); + + LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, metricName={}, KafkaConsumer={}", systemName, + clientId, metricName, this.kafkaConsumer.toString()); + } + + public static <K, V> KafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config, + String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { + + // extract consumer configs and create kafka consumer + KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config); + LOG.info("Created kafka consumer for system {}, clientId {}: {}", systemName, clientId, kafkaConsumer); + + KafkaSystemConsumer kc = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock); + LOG.info("Created samza system consumer {}", kc.toString()); + + return kc; + } + + /** + * create kafka consumer + * @param systemName system name for which we create the consumer + * @param clientId client id to use int the kafka client + * @param config config + * @return kafka consumer + */ + public static <K, V> KafkaConsumer<K, V> getKafkaConsumerImpl(String systemName, String clientId, Config config) { + + Map<String, String> injectProps = new HashMap<>(); + + // extract kafka client configs + KafkaConsumerConfig consumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps); + + LOG.info("KafkaClient properties for systemName {}: {}", systemName, consumerConfig.originals()); + + return new KafkaConsumer<>(consumerConfig.originals()); + } + + @Override + public void start() { + if (!started.compareAndSet(false, true)) { + LOG.warn("attempting to start the consumer for the second (or more) time."); + return; + } + if (stopped.get()) { + LOG.warn("attempting to start a stopped consumer"); + return; + } + // initialize the subscriptions for all the registered TopicPartitions + startSubscription(); + // needs to be called after all the registrations are completed + setFetchThresholds(); + + startConsumer(); + LOG.info("consumer {} started", this); + } + + private void startSubscription() { + //subscribe to all the registered TopicPartitions + LOG.info("consumer {}, subscribes to {} ", this, topicPartitions2SSP.keySet()); + try { + synchronized (kafkaConsumer) { + // we are using assign (and not subscribe), so we need to specify both topic and partition + kafkaConsumer.assign(topicPartitions2SSP.keySet()); + } + } catch (Exception e) { + LOG.warn("startSubscription failed.", e); + throw new SamzaException(e); + } + } + + /* + Set the offsets to start from. + Add the TopicPartitions to the proxy. + Start the proxy thread. + */ + void startConsumer() { + //set the offset for each TopicPartition + if (topicPartitions2Offset.size() <= 0) { + LOG.warn("Consumer {} is not subscribed to any SSPs", this); + } + + topicPartitions2Offset.forEach((tp, startingOffsetString) -> { + long startingOffset = Long.valueOf(startingOffsetString); + + try { + synchronized (kafkaConsumer) { + // TODO in the future we may need to add special handling here for BEGIN/END_OFFSET + // this will call KafkaConsumer.seekToBegin/End() + kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value + } + } catch (Exception e) { + // all other exceptions - non recoverable + LOG.error("Got Exception while seeking to " + startingOffsetString + " for " + tp, e); + throw new SamzaException(e); + } + + LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + startingOffsetString); + + // add the partition to the proxy + proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset); + }); + + // start the proxy thread + if (proxy != null && !proxy.isRunning()) { + LOG.info("Starting proxy: " + proxy); + proxy.start(); + } + } + + private void setFetchThresholds() { + // get the thresholds, and set defaults if not defined. + KafkaConfig kafkaConfig = new KafkaConfig(config); + + Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName); + long fetchThreshold = FETCH_THRESHOLD; + if (fetchThresholdOption.isDefined()) { + fetchThreshold = Long.valueOf(fetchThresholdOption.get()); + LOG.info("fetchThresholdOption is configured. fetchThreshold=" + fetchThreshold); + } + + Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); + long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; + if (fetchThresholdBytesOption.isDefined()) { + fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); + LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" + fetchThresholdBytes); + } + + int numTPs = topicPartitions2SSP.size(); + assert (numTPs == topicPartitions2Offset.size()); + + LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold); + LOG.info("number of topicPartitions " + numTPs); + + if (numTPs > 0) { + perPartitionFetchThreshold = fetchThreshold / numTPs; + LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold); + if (fetchThresholdBytesEnabled) { + // currently this feature cannot be enabled, because we do not have the size of the messages available. + // messages get double buffered, hence divide by 2 + perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs; + LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" + + perPartitionFetchThresholdBytes); + } + } + } + + @Override + public void stop() { + LOG.info("Stopping Samza kafkaConsumer " + this); + + if (!stopped.compareAndSet(false, true)) { + LOG.warn("attempting to stop stopped consumer."); + return; + } + + // stop the proxy (with 5 minutes timeout) + if (proxy != null) { + LOG.info("Stopping proxy " + proxy); + proxy.stop(TimeUnit.MINUTES.toMillis(5)); + } + + try { + synchronized (kafkaConsumer) { + LOG.info("Closing kafka consumer " + kafkaConsumer); + kafkaConsumer.close(); + } + } catch (Exception e) { + LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e); + } + } + + /* + record the ssp and the offset. Do not submit it to the consumer yet. + */ + @Override + public void register(SystemStreamPartition systemStreamPartition, String offset) { + if (started.get()) { + String msg = + String.format("Trying to register partition after consumer has been started. sn=%s, ssp=%s", systemName, + systemStreamPartition); + LOG.error(msg); + throw new SamzaException(msg); + } + + if (!systemStreamPartition.getSystem().equals(systemName)) { + LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName); + return; + } + super.register(systemStreamPartition, offset); + + TopicPartition tp = toTopicPartition(systemStreamPartition); + + topicPartitions2SSP.put(tp, systemStreamPartition); + + LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + offset); + + String existingOffset = topicPartitions2Offset.get(tp); + // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. + if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) { + topicPartitions2Offset.put(tp, offset); + } + + samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp)); + } + + /** + * Compare two String offsets. + * Note. There is a method in KafkaAdmin that does that, but that would require instantiation of systemadmin for each consumer. + * @return see {@link Long#compareTo(Long)} + */ + public static int compareOffsets(String offset1, String offset2) { + return Long.valueOf(offset1).compareTo(Long.valueOf(offset2)); + } + + @Override + public String toString() { + return systemName + "/" + clientId + "/" + super.toString(); + } + + @Override + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( + Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { + + // check if the proxy is running + if (!proxy.isRunning()) { + stop(); + if (proxy.getFailureCause() != null) { + String message = "KafkaConsumerProxy has stopped"; + throw new SamzaException(message, proxy.getFailureCause()); + } else { + LOG.warn("Failure cause is not populated for KafkaConsumerProxy"); + throw new SamzaException("KafkaConsumerProxy has stopped"); + } + } + + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout); + return res; + } + + /** + * convert from TopicPartition to TopicAndPartition + */ + public static TopicAndPartition toTopicAndPartition(TopicPartition tp) { + return new TopicAndPartition(tp.topic(), tp.partition()); + } + + /** + * convert to TopicPartition from SystemStreamPartition + */ + public static TopicPartition toTopicPartition(SystemStreamPartition ssp) { + return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); + } + + /** + * return system name for this consumer + * @return system name + */ + public String getSystemName() { + return systemName; + } + + //////////////////////////////////// + // inner class for the message sink + //////////////////////////////////// + public class KafkaConsumerMessageSink { + + public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) { + setIsAtHead(ssp, isAtHighWatermark); + } + + boolean needsMoreMessages(SystemStreamPartition ssp) { + if (LOG.isDebugEnabled()) { + LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" + + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled, + getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), + perPartitionFetchThreshold); + } + + if (fetchThresholdBytesEnabled) { + return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; + } else { + return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold; + } + } + + void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { + LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope); + + try { + put(ssp, envelope); + } catch (InterruptedException e) { + throw new SamzaException( + String.format("Interrupted while trying to add message with offset %s for ssp %s", envelope.getOffset(), + ssp)); + } + } + } // end of KafkaMessageSink class + /////////////////////////////////////////////////////////////////////////// +} http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 6f58bed..e0e85be 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -50,7 +50,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { val clientId = KafkaConsumerConfig.getConsumerClientId( config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) - NewKafkaSystemConsumer.getNewKafkaSystemConsumer( + KafkaSystemConsumer.getNewKafkaSystemConsumer( systemName, config, clientId, metrics, new SystemClock) } @@ -76,7 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { } def getAdmin(systemName: String, config: Config): SystemAdmin = { - val clientId = KafkaConsumerConfig.getConsumerClientId(config) + val clientId = KafkaConsumerConfig.getAdminClientId(config) val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) val bootstrapServers = producerConfig.bootsrapServers val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java deleted file mode 100644 index afec8ad..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java +++ /dev/null @@ -1,412 +0,0 @@ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import kafka.common.TopicAndPartition; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.KafkaConsumerConfig; -import org.apache.kafka.common.TopicPartition; -import org.apache.samza.Partition; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.KafkaConfig; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.BlockingEnvelopeMap; -import org.apache.samza.util.Clock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; - - -public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer { - - private static final Logger LOG = LoggerFactory.getLogger(NewKafkaSystemConsumer.class); - - private static final long FETCH_THRESHOLD = 50000; - private static final long FETCH_THRESHOLD_BYTES = -1L; - - private final Consumer<K, V> kafkaConsumer; - private final String systemName; - private final KafkaSystemConsumerMetrics samzaConsumerMetrics; - private final String clientId; - private final String metricName; - private final AtomicBoolean stopped = new AtomicBoolean(false); - private final AtomicBoolean started = new AtomicBoolean(false); - private final Config config; - private final boolean fetchThresholdBytesEnabled; - - // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. - /* package private */ KafkaConsumerMessageSink messageSink; - - // proxy is doing the actual reading - private KafkaConsumerProxy proxy; - - /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>(); - /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); - - /* package private */ long perPartitionFetchThreshold; - /* package private */ long perPartitionFetchThresholdBytes; - - /** - * @param systemName - * @param config - * @param metrics - */ - protected NewKafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId, - KafkaSystemConsumerMetrics metrics, Clock clock) { - - super(metrics.registry(), clock, metrics.getClass().getName()); - - this.kafkaConsumer = kafkaConsumer; - this.samzaConsumerMetrics = metrics; - this.clientId = clientId; - this.systemName = systemName; - this.config = config; - this.metricName = systemName + " " + clientId; - - this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); - - LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, metricName={}, KafkaConsumer={}", systemName, - clientId, metricName, this.kafkaConsumer.toString()); - } - - public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config, - String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { - - // extract consumer configs and create kafka consumer - KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config); - LOG.info("Created kafka consumer for system {}, clientId {}: {}", systemName, clientId, kafkaConsumer); - - NewKafkaSystemConsumer kc = new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock); - LOG.info("Created samza system consumer {}", kc.toString()); - - return kc; - } - - /** - * create kafka consumer - * @param systemName - * @param clientId - * @param config - * @return kafka consumer - */ - private static <K, V> KafkaConsumer<K, V> getKafkaConsumerImpl(String systemName, String clientId, Config config) { - - Map<String, String> injectProps = new HashMap<>(); - - // extract kafka client configs - KafkaConsumerConfig consumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps); - - LOG.info("KafkaClient properties for systemName {}: {}", systemName, consumerConfig.originals()); - - return new KafkaConsumer<>(consumerConfig.originals()); - } - - @Override - public void start() { - if (!started.compareAndSet(false, true)) { - LOG.warn("attempting to start the consumer for the second (or more) time."); - return; - } - if (stopped.get()) { - LOG.warn("attempting to start a stopped consumer"); - return; - } - // initialize the subscriptions for all the registered TopicPartitions - startSubscription(); - // needs to be called after all the registrations are completed - setFetchThresholds(); - // Create the proxy to do the actual message reading. It is a separate thread that reads the messages from the stream - // and puts them into the sink. - createConsumerProxy(); - startConsumer(); - LOG.info("consumer {} started", this); - } - - private void startSubscription() { - //subscribe to all the registered TopicPartitions - LOG.info("consumer {}, subscribes to {} ", this, topicPartitions2SSP.keySet()); - try { - synchronized (kafkaConsumer) { - // we are using assign (and not subscribe), so we need to specify both topic and partition - kafkaConsumer.assign(topicPartitions2SSP.keySet()); - } - } catch (Exception e) { - LOG.warn("startSubscription failed.", e); - throw new SamzaException(e); - } - } - - void createConsumerProxy() { - // create a sink for passing the messages between the proxy and the consumer - messageSink = new KafkaConsumerMessageSink(); - - // create the thread with the consumer - proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName); - - LOG.info("Created consumer proxy: " + proxy); - } - - /* - Set the offsets to start from. - Add the TopicPartitions to the proxy. - Start the proxy thread. - */ - void startConsumer() { - //set the offset for each TopicPartition - if (topicPartitions2Offset.size() <= 0) { - LOG.warn("Consumer {} is not subscribed to any SSPs", this); - } - - topicPartitions2Offset.forEach((tp, startingOffsetString) -> { - long startingOffset = Long.valueOf(startingOffsetString); - - try { - synchronized (kafkaConsumer) { - // TODO in the future we may need to add special handling here for BEGIN/END_OFFSET - // this will call KafkaConsumer.seekToBegin/End() - kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value - } - } catch (Exception e) { - // all other exceptions - non recoverable - LOG.error("Got Exception while seeking to " + startingOffsetString + " for " + tp, e); - throw new SamzaException(e); - } - - LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + startingOffsetString); - - // add the partition to the proxy - proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset); - }); - - // start the proxy thread - if (proxy != null && !proxy.isRunning()) { - LOG.info("Starting proxy: " + proxy); - proxy.start(); - } - } - - private void setFetchThresholds() { - // get the thresholds, and set defaults if not defined. - KafkaConfig kafkaConfig = new KafkaConfig(config); - - Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName); - long fetchThreshold = FETCH_THRESHOLD; - if (fetchThresholdOption.isDefined()) { - fetchThreshold = Long.valueOf(fetchThresholdOption.get()); - LOG.info("fetchThresholdOption is configured. fetchThreshold=" + fetchThreshold); - } - - Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); - long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; - if (fetchThresholdBytesOption.isDefined()) { - fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); - LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" + fetchThresholdBytes); - } - - int numTPs = topicPartitions2SSP.size(); - assert (numTPs == topicPartitions2Offset.size()); - - LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold); - LOG.info("number of topicPartitions " + numTPs); - - if (numTPs > 0) { - perPartitionFetchThreshold = fetchThreshold / numTPs; - LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold); - if (fetchThresholdBytesEnabled) { - // currently this feature cannot be enabled, because we do not have the size of the messages available. - // messages get double buffered, hence divide by 2 - perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs; - LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" - + perPartitionFetchThresholdBytes); - } - } - } - - @Override - public void stop() { - LOG.info("Stopping Samza kafkaConsumer " + this); - - if (!stopped.compareAndSet(false, true)) { - LOG.warn("attempting to stop stopped consumer."); - return; - } - - // stop the proxy (with 5 minutes timeout) - if (proxy != null) { - LOG.info("Stopping proxy " + proxy); - proxy.stop(TimeUnit.MINUTES.toMillis(5)); - } - - try { - synchronized (kafkaConsumer) { - LOG.info("Closing kafka consumer " + kafkaConsumer); - kafkaConsumer.close(); - } - } catch (Exception e) { - LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e); - } - } - - /* - record the ssp and the offset. Do not submit it to the consumer yet. - */ - @Override - public void register(SystemStreamPartition systemStreamPartition, String offset) { - if (started.get()) { - String msg = - String.format("Trying to register partition after consumer has been started. sn=%s, ssp=%s", systemName, - systemStreamPartition); - LOG.error(msg); - throw new SamzaException(msg); - } - - if (!systemStreamPartition.getSystem().equals(systemName)) { - LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName); - return; - } - super.register(systemStreamPartition, offset); - - TopicPartition tp = toTopicPartition(systemStreamPartition); - - topicPartitions2SSP.put(tp, systemStreamPartition); - - LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + offset); - - String existingOffset = topicPartitions2Offset.get(tp); - // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. - if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) { - topicPartitions2Offset.put(tp, offset); - } - - samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp)); - } - - /** - * Compare two String offsets. - * Note. There is a method in KafkaAdmin that does that, but that would require instantiation of systemadmin for each consumer. - * @param off1 - * @param off2 - * @return see {@link Long#compareTo(Long)} - */ - public static int compareOffsets(String off1, String off2) { - return Long.valueOf(off1).compareTo(Long.valueOf(off2)); - } - - @Override - public String toString() { - return systemName + "/" + clientId + "/" + super.toString(); - } - - @Override - public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( - Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { - - // check if the proxy is running - if (!proxy.isRunning()) { - stop(); - if (proxy.getFailureCause() != null) { - String message = "KafkaConsumerProxy has stopped"; - throw new SamzaException(message, proxy.getFailureCause()); - } else { - LOG.warn("Failure cause is not populated for KafkaConsumerProxy"); - throw new SamzaException("KafkaConsumerProxy has stopped"); - } - } - - Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout); - return res; - } - - public static TopicAndPartition toTopicAndPartition(TopicPartition tp) { - return new TopicAndPartition(tp.topic(), tp.partition()); - } - - public static TopicAndPartition toTopicAndPartition(SystemStreamPartition ssp) { - return new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); - } - - public static TopicPartition toTopicPartition(SystemStreamPartition ssp) { - return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); - } - - public static SystemStreamPartition toSystemStreamPartition(String systemName, TopicAndPartition tp) { - return new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition())); - } - - /** - * return system name for this consumer - * @return system name - */ - public String getSystemName() { - return systemName; - } - - //////////////////////////////////// - // inner class for the message sink - //////////////////////////////////// - public class KafkaConsumerMessageSink { - - public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) { - setIsAtHead(ssp, isAtHighWatermark); - } - - boolean needsMoreMessages(SystemStreamPartition ssp) { - if (LOG.isDebugEnabled()) { - LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" - + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled, - getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), - perPartitionFetchThreshold); - } - - if (fetchThresholdBytesEnabled) { - return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; - } else { - return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold; - } - } - - void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { - LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope); - - try { - put(ssp, envelope); - } catch (InterruptedException e) { - throw new SamzaException( - String.format("Interrupted while trying to add message with offset %s for ssp %s", envelope.getOffset(), - ssp)); - } - } - } // end of KafkaMessageSink class - /////////////////////////////////////////////////////////////////////////// -} http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java new file mode 100644 index 0000000..d90bc35 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java @@ -0,0 +1,224 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaConsumerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestKafkaSystemConsumer { + public final String TEST_SYSTEM = "test-system"; + public final String TEST_STREAM = "test-stream"; + public final String TEST_CLIENT_ID = "testClientId"; + public final String BOOTSTRAP_SERVER = "127.0.0.1:8888"; + public final String FETCH_THRESHOLD_MSGS = "50000"; + public final String FETCH_THRESHOLD_BYTES = "100000"; + + @Before + public void setUp() { + + } + + private KafkaSystemConsumer setupConsumer(String fetchMsg, String fetchBytes) { + final Map<String, String> map = new HashMap<>(); + + map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg); + map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes); + map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + BOOTSTRAP_SERVER); + + Config config = new MapConfig(map); + KafkaConsumerConfig consumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID, Collections.emptyMap()); + final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig.originals()); + + MockKafkaSystmeCosumer newKafkaSystemConsumer = + new MockKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID, + new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis); + + return newKafkaSystemConsumer; + } + + @Test + public void testConfigValidations() { + + final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + consumer.start(); + // should be no failures + } + + @Test + public void testFetchThresholdShouldDivideEvenlyAmongPartitions() { + final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + final int partitionsNum = 50; + for (int i = 0; i < partitionsNum; i++) { + consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0"); + } + + consumer.start(); + + Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold); + Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum, + consumer.perPartitionFetchThresholdBytes); + } + + @Test + public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { + + KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2)); + + consumer.register(ssp0, "0"); + consumer.register(ssp0, "5"); + consumer.register(ssp1, "2"); + consumer.register(ssp1, "3"); + consumer.register(ssp2, "0"); + + assertEquals("0", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp0))); + assertEquals("2", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp1))); + assertEquals("0", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp2))); + } + + @Test + public void testFetchThresholdBytes() { + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + int partitionsNum = 2; + int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size + int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; // fake size + int ime11Size = 20; + ByteArraySerializer bytesSerde = new ByteArraySerializer(); + IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), + bytesSerde.serialize("", "value0".getBytes()), ime0Size); + IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), + bytesSerde.serialize("", "value1".getBytes()), ime1Size); + IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), + bytesSerde.serialize("", "value11".getBytes()), ime11Size); + KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + consumer.register(ssp0, "0"); + consumer.register(ssp1, "0"); + consumer.start(); + consumer.messageSink.addMessage(ssp0, ime0); + // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0)); + consumer.messageSink.addMessage(ssp1, ime1); + // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1) + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); + consumer.messageSink.addMessage(ssp1, ime11); + // queue for ssp1 should full now, because we added message of size 20 on top + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); + + Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); + Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); + Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); + Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + } + + @Test + public void testFetchThresholdBytesDiabled() { + // Pass 0 as fetchThresholdByBytes, which disables checking for limit by size + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + int partitionsNum = 2; + int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit + int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit + int ime11Size = 20;// event with the second message still below the size limit + ByteArraySerializer bytesSerde = new ByteArraySerializer(); + IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), + bytesSerde.serialize("", "value0".getBytes()), ime0Size); + IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), + bytesSerde.serialize("", "value1".getBytes()), ime1Size); + IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), + bytesSerde.serialize("", "value11".getBytes()), ime11Size); + + // limit by number of messages 4/2 = 2 per partition + // limit by number of bytes - disabled + KafkaSystemConsumer consumer = setupConsumer("4", "0"); // should disable + + consumer.register(ssp0, "0"); + consumer.register(ssp1, "0"); + consumer.start(); + consumer.messageSink.addMessage(ssp0, ime0); + // should be full by size, but not full by number of messages (1 of 2) + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0)); + consumer.messageSink.addMessage(ssp1, ime1); + // not full neither by size nor by messages + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); + consumer.messageSink.addMessage(ssp1, ime11); + // not full by size, but should be full by messages + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); + + Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); + Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); + Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); + Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + } + + // mock kafkaConsumer and SystemConsumer + static class MockKafkaConsumer extends KafkaConsumer { + public MockKafkaConsumer(Map<String, Object> configs) { + super(configs); + } + } + + static class MockKafkaSystmeCosumer extends KafkaSystemConsumer { + public MockKafkaSystmeCosumer(Consumer kafkaConsumer, String systemName, Config config, String clientId, + KafkaSystemConsumerMetrics metrics, Clock clock) { + super(kafkaConsumer, systemName, config, clientId, metrics, clock); + } + + //@Override + //void createConsumerProxy() { + // this.messageSink = new KafkaConsumerMessageSink(); + //} + + @Override + void startConsumer() { + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java deleted file mode 100644 index fb7533b..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.KafkaConsumerConfig; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.samza.Partition; -import org.apache.samza.config.Config; -import org.apache.samza.config.KafkaConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.Clock; -import org.apache.samza.util.NoOpMetricsRegistry; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.*; - - -public class TestNewKafkaSystemConsumer { - public final String TEST_SYSTEM = "test-system"; - public final String TEST_STREAM = "test-stream"; - public final String TEST_CLIENT_ID = "testClientId"; - public final String BOOTSTRAP_SERVER = "127.0.0.1:8888"; - public final String FETCH_THRESHOLD_MSGS = "50000"; - public final String FETCH_THRESHOLD_BYTES = "100000"; - - @Before - public void setUp() { - - } - - private NewKafkaSystemConsumer setupConsumer(String fetchMsg, String fetchBytes) { - final Map<String, String> map = new HashMap<>(); - - map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg); - map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes); - map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), - BOOTSTRAP_SERVER); - - Config config = new MapConfig(map); - KafkaConsumerConfig consumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID, Collections.emptyMap()); - final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig.originals()); - - MockNewKafkaSystmeCosumer newKafkaSystemConsumer = - new MockNewKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID, - new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis); - - return newKafkaSystemConsumer; - } - - @Test - public void testConfigValidations() { - - final NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); - - consumer.start(); - // should be no failures - } - - @Test - public void testFetchThresholdShouldDivideEvenlyAmongPartitions() { - final NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); - final int partitionsNum = 50; - for (int i = 0; i < partitionsNum; i++) { - consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0"); - } - - consumer.start(); - - Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold); - Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum, - consumer.perPartitionFetchThresholdBytes); - } - - @Test - public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { - - NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); - - SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); - SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); - SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2)); - - consumer.register(ssp0, "0"); - consumer.register(ssp0, "5"); - consumer.register(ssp1, "2"); - consumer.register(ssp1, "3"); - consumer.register(ssp2, "0"); - - assertEquals("0", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp0))); - assertEquals("2", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp1))); - assertEquals("0", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp2))); - } - - @Test - public void testFetchThresholdBytes() { - - SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); - SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); - int partitionsNum = 2; - int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size - int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; // fake size - int ime11Size = 20; - ByteArraySerializer bytesSerde = new ByteArraySerializer(); - IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), - bytesSerde.serialize("", "value0".getBytes()), ime0Size); - IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), - bytesSerde.serialize("", "value1".getBytes()), ime1Size); - IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), - bytesSerde.serialize("", "value11".getBytes()), ime11Size); - NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); - - consumer.register(ssp0, "0"); - consumer.register(ssp1, "0"); - consumer.start(); - consumer.messageSink.addMessage(ssp0, ime0); - // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum - Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0)); - consumer.messageSink.addMessage(ssp1, ime1); - // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1) - Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); - consumer.messageSink.addMessage(ssp1, ime11); - // queue for ssp1 should full now, because we added message of size 20 on top - Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); - - Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); - Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); - Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); - Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); - } - - @Test - public void testFetchThresholdBytesDiabled() { - // Pass 0 as fetchThresholdByBytes, which disables checking for limit by size - - SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); - SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); - int partitionsNum = 2; - int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit - int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit - int ime11Size = 20;// event with the second message still below the size limit - ByteArraySerializer bytesSerde = new ByteArraySerializer(); - IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), - bytesSerde.serialize("", "value0".getBytes()), ime0Size); - IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), - bytesSerde.serialize("", "value1".getBytes()), ime1Size); - IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), - bytesSerde.serialize("", "value11".getBytes()), ime11Size); - - // limit by number of messages 4/2 = 2 per partition - // limit by number of bytes - disabled - NewKafkaSystemConsumer consumer = setupConsumer("4", "0"); // should disable - - consumer.register(ssp0, "0"); - consumer.register(ssp1, "0"); - consumer.start(); - consumer.messageSink.addMessage(ssp0, ime0); - // should be full by size, but not full by number of messages (1 of 2) - Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0)); - consumer.messageSink.addMessage(ssp1, ime1); - // not full neither by size nor by messages - Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); - consumer.messageSink.addMessage(ssp1, ime11); - // not full by size, but should be full by messages - Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); - - Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); - Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); - Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); - Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); - } - - // mock kafkaConsumer and SystemConsumer - static class MockKafkaConsumer extends KafkaConsumer { - public MockKafkaConsumer(Map<String, Object> configs) { - super(configs); - } - } - - static class MockNewKafkaSystmeCosumer extends NewKafkaSystemConsumer { - public MockNewKafkaSystmeCosumer(Consumer kafkaConsumer, String systemName, Config config, String clientId, - KafkaSystemConsumerMetrics metrics, Clock clock) { - super(kafkaConsumer, systemName, config, clientId, metrics, clock); - } - - @Override - void createConsumerProxy() { - this.messageSink = new KafkaConsumerMessageSink(); - } - - @Override - void startConsumer() { - } - } -}
