addessed some review comments

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/26552213
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/26552213
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/26552213

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 2655221348304507e1a91e6fa93ef2dc79a4620d
Parents: 9217644
Author: Boris S <[email protected]>
Authored: Mon Sep 10 11:17:18 2018 -0700
Committer: Boris S <[email protected]>
Committed: Mon Sep 10 11:17:18 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala |   2 +-
 .../samza/coordinator/JobModelManager.scala     |   3 +-
 .../clients/consumer/KafkaConsumerConfig.java   |  43 +-
 .../samza/system/kafka/KafkaConsumerProxy.java  |  50 +--
 .../samza/system/kafka/KafkaSystemConsumer.java | 406 ++++++++++++++++++
 .../samza/system/kafka/KafkaSystemFactory.scala |   4 +-
 .../system/kafka/NewKafkaSystemConsumer.java    | 412 -------------------
 .../system/kafka/TestKafkaSystemConsumer.java   | 224 ++++++++++
 .../kafka/TestNewKafkaSystemConsumer.java       | 224 ----------
 9 files changed, 687 insertions(+), 681 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index b17788f..5ee9206 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -829,7 +829,7 @@ class SamzaContainer(
     }
 
     try {
-      info("Shutting down Samza.")
+      info("Shutting down SamzaContaier.")
       removeShutdownHook
 
       jmxServer.stop

http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index f95a521..e626d9a 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -35,7 +35,6 @@ import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -64,7 +63,7 @@ object JobModelManager extends Logging {
    * a) Reads the jobModel from coordinator stream using the job's 
configuration.
    * b) Recomputes changelog partition mapping based on jobModel and job's 
configuration.
    * c) Builds JobModelManager using the jobModel read from coordinator stream.
-   * @param config Coordinator stream manager config.
+   * @param config Config from the coordinator stream.
    * @param changelogPartitionMapping The changelog partition-to-task mapping.
    * @return JobModelManager
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
index 98792ab..8ca5b93 100644
--- 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -54,21 +54,28 @@ public class KafkaConsumerConfig extends ConsumerConfig {
    * By default, KafkaConsumer will fetch ALL available messages for all the 
partitions.
    * This may cause memory issues. That's why we will limit the number of 
messages per partition we get on EACH poll().
    */
-  private static final String KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT = "100";
+  private static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
 
-
-  public KafkaConsumerConfig(Properties props) {
+  private KafkaConsumerConfig(Properties props) {
     super(props);
   }
 
+  /**
+   * Create kafka consumer configs, based on the subset of global configs.
+   * @param config
+   * @param systemName
+   * @param clientId
+   * @param injectProps
+   * @return KafkaConsumerConfig
+   */
   public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config 
config, String systemName, String clientId,
       Map<String, String> injectProps) {
 
-    Config subConf = config.subset(String.format("systems.%s.consumer.", 
systemName), true);
+    final Config subConf = config.subset(String.format("systems.%s.consumer.", 
systemName), true);
 
-    String groupId = getConsumerGroupId(config);
+    final String groupId = getConsumerGroupId(config);
 
-    Properties consumerProps = new Properties();
+    final Properties consumerProps = new Properties();
     consumerProps.putAll(subConf);
 
     consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@@ -109,8 +116,8 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     }
 
     // NOT SURE THIS IS NEEDED TODO
-    String maxPollRecords =
-        subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT);
+    final String maxPollRecords =
+        subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
     consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords);
 
     // put overrides
@@ -120,7 +127,7 @@ public class KafkaConsumerConfig extends ConsumerConfig {
   }
 
   // group id should be unique per job
-  static String getConsumerGroupId(Config config) {
+  private static String getConsumerGroupId(Config config) {
     JobConfig jobConfig = new JobConfig(config);
     Option<String> jobIdOption = jobConfig.getJobId();
     Option<String> jobNameOption = jobConfig.getName();
@@ -151,11 +158,12 @@ public class KafkaConsumerConfig extends ConsumerConfig {
   }
 
   /**
-   * Settings for auto.reset in samza are different from settings in Kafka 
(auto.offset.reset) - need to convert
+   * If settings for auto.reset in samza are different from settings in Kafka 
(auto.offset.reset),
+   * then need to convert them (see kafka.apache.org/documentation):
    * "largest" -> "latest"
    * "smallest" -> "earliest"
-   * "none" -> "none"
-   * "none" - will fail the kafka consumer, if offset is out of range
+   *
+   * If no setting specified we return "latest" (same as Kafka).
    * @param properties All consumer related {@link Properties} parsed from 
samza config
    * @return String representing the config value for "auto.offset.reset" 
property
    */
@@ -168,13 +176,18 @@ public class KafkaConsumerConfig extends ConsumerConfig {
       return autoOffsetReset;
     }
 
+    String newAutoOffsetReset;
     switch (autoOffsetReset) {
       case SAMZA_OFFSET_LARGEST:
-        return KAFKA_OFFSET_LATEST;
+        newAutoOffsetReset =  KAFKA_OFFSET_LATEST;
+        break;
       case SAMZA_OFFSET_SMALLEST:
-        return KAFKA_OFFSET_EARLIEST;
+        newAutoOffsetReset =  KAFKA_OFFSET_EARLIEST;
+        break;
       default:
-        return KAFKA_OFFSET_LATEST;
+        newAutoOffsetReset =  KAFKA_OFFSET_LATEST;
     }
+    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, 
 newAutoOffsetReset);
+    return newAutoOffsetReset;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index ae80d50..0825c90 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -22,7 +22,6 @@
 package org.apache.samza.system.kafka;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -40,6 +39,7 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
@@ -58,13 +58,13 @@ public class KafkaConsumerProxy<K, V> {
 
   /* package private */ final Thread consumerPollThread;
   private final Consumer<K, V> kafkaConsumer;
-  private final NewKafkaSystemConsumer.KafkaConsumerMessageSink sink;
+  private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
   private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
   private final String metricName;
   private final String systemName;
   private final String clientId;
   private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP 
= new HashMap<>();
-  private final Map<SystemStreamPartition, MetricName> ssp2MetricName = new 
HashMap<>();
+  private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = 
new HashMap<>();
   // list of all the SSPs we poll from, with their next offsets 
correspondingly.
   private final Map<SystemStreamPartition, Long> nextOffsets = new 
ConcurrentHashMap<>();
   // lags behind the high water mark, as reported by the Kafka consumer.
@@ -75,7 +75,7 @@ public class KafkaConsumerProxy<K, V> {
   private final CountDownLatch consumerPollThreadStartLatch = new 
CountDownLatch(1);
 
   public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, 
String clientId,
-      NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, 
KafkaSystemConsumerMetrics samzaConsumerMetrics,
+      KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, 
KafkaSystemConsumerMetrics samzaConsumerMetrics,
       String metricName) {
 
     this.kafkaConsumer = kafkaConsumer;
@@ -88,14 +88,15 @@ public class KafkaConsumerProxy<K, V> {
     this.kafkaConsumerMetrics.registerClientProxy(metricName);
 
     consumerPollThread = new Thread(createProxyThreadRunnable());
+    consumerPollThread.setDaemon(true);
+    consumerPollThread.setName(
+        "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - 
" + systemName);
   }
 
   public void start() {
     if (!consumerPollThread.isAlive()) {
       LOG.info("Starting KafkaConsumerProxy polling thread for system " + 
systemName + " " + this.toString());
-      consumerPollThread.setDaemon(true);
-      consumerPollThread.setName(
-          "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " 
- " + systemName);
+
       consumerPollThread.start();
 
       // we need to wait until the thread starts
@@ -116,7 +117,7 @@ public class KafkaConsumerProxy<K, V> {
   public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
     LOG.info(String.format("Adding new topic and partition %s, offset = %s to 
queue for consumer %s", ssp, nextOffset,
         this));
-    topicPartitions2SSP.put(NewKafkaSystemConsumer.toTopicPartition(ssp), 
ssp); //registered SSPs
+    topicPartitions2SSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); 
//registered SSPs
 
     // this is already vetted offset so there is no need to validate it
     LOG.info(String.format("Got offset %s for new topic and partition %s.", 
nextOffset, ssp));
@@ -135,7 +136,6 @@ public class KafkaConsumerProxy<K, V> {
     Runnable runnable=  () -> {
       isRunning = true;
 
-
       try {
         consumerPollThreadStartLatch.countDown();
         LOG.info("Starting runnable " + consumerPollThread.getName());
@@ -230,19 +230,19 @@ public class KafkaConsumerProxy<K, V> {
 
   private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
processResults(ConsumerRecords<K, V> records) {
     if (records == null) {
-      return Collections.emptyMap();
+      throw new SamzaException("processResults is called with null object for 
records");
     }
 
     int capacity = (int) (records.count() / 0.75 + 1); // to avoid rehash, 
allocate more then 75% of expected capacity.
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new 
HashMap<>(capacity);
     // Parse the returned records and convert them into the 
IncomingMessageEnvelope.
     // Note. They have been already de-serialized by the consumer.
-    for (ConsumerRecord<K, V> r : records) {
-      int partition = r.partition();
-      String topic = r.topic();
+    for (ConsumerRecord<K, V> record : records) {
+      int partition = record.partition();
+      String topic = record.topic();
       TopicPartition tp = new TopicPartition(topic, partition);
 
-      updateMetrics(r, tp);
+      updateMetrics(record, tp);
 
       SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
       List<IncomingMessageEnvelope> listMsgs = results.get(ssp);
@@ -251,10 +251,10 @@ public class KafkaConsumerProxy<K, V> {
         results.put(ssp, listMsgs);
       }
 
-      final K key = r.key();
-      final Object value = r.value();
-      IncomingMessageEnvelope imEnvelope =
-          new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, 
value, getRecordSize(r));
+      final K key = record.key();
+      final Object value = record.value();
+      final IncomingMessageEnvelope imEnvelope =
+          new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), 
key, value, getRecordSize(record));
       listMsgs.add(imEnvelope);
     }
     if (LOG.isDebugEnabled()) {
@@ -274,8 +274,8 @@ public class KafkaConsumerProxy<K, V> {
   }
 
   private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
-    TopicAndPartition tap = NewKafkaSystemConsumer.toTopicAndPartition(tp);
-    SystemStreamPartition ssp = 
NewKafkaSystemConsumer.toSystemStreamPartition(systemName, tap);
+    TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
+    SystemStreamPartition ssp = new SystemStreamPartition(systemName, 
tp.topic(), new Partition(tp.partition()));
     long currentSSPLag = getLatestLag(ssp); // lag between the current offset 
and the highwatermark
     if (currentSSPLag < 0) {
       return;
@@ -312,8 +312,8 @@ public class KafkaConsumerProxy<K, V> {
     tags.put("client-id", clientId);// this is required by the KafkaConsumer 
to get the metrics
 
     for (SystemStreamPartition ssp : ssps) {
-      TopicPartition tp = NewKafkaSystemConsumer.toTopicPartition(ssp);
-      ssp2MetricName.put(ssp, new MetricName(tp + ".records-lag", 
"consumer-fetch-manager-metrics", "", tags));
+      TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
+      perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", 
"consumer-fetch-manager-metrics", "", tags));
     }
   }
 
@@ -327,12 +327,12 @@ public class KafkaConsumerProxy<K, V> {
     Map<MetricName, ? extends Metric> consumerMetrics = 
kafkaConsumer.metrics();
 
     // populate the MetricNames first time
-    if (ssp2MetricName.isEmpty()) {
+    if (perPartitionMetrics.isEmpty()) {
       populateMetricNames(ssps);
     }
 
     for (SystemStreamPartition ssp : ssps) {
-      MetricName mn = ssp2MetricName.get(ssp);
+      MetricName mn = perPartitionMetrics.get(ssp);
       Metric currentLagM = consumerMetrics.get(mn);
 
       // High watermark is fixed to be the offset of last available message,
@@ -412,7 +412,7 @@ public class KafkaConsumerProxy<K, V> {
     for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
       SystemStreamPartition ssp = e.getKey();
       Long offset = e.getValue();
-      TopicAndPartition tp = NewKafkaSystemConsumer.toTopicAndPartition(ssp);
+      TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId());
       Long lag = latestLags.get(ssp);
       LOG.trace("Latest offset of {} is  {}; lag = {}", ssp, offset, lag);
       if (lag != null && offset != null && lag >= 0) {

http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
new file mode 100644
index 0000000..196fb85
--- /dev/null
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -0,0 +1,406 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements 
SystemConsumer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSystemConsumer.class);
+
+  private static final long FETCH_THRESHOLD = 50000;
+  private static final long FETCH_THRESHOLD_BYTES = -1L;
+
+  private final Consumer<K, V> kafkaConsumer;
+  private final String systemName;
+  private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
+  private final String clientId;
+  private final String metricName;
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private final Config config;
+  private final boolean fetchThresholdBytesEnabled;
+
+  // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
+  /* package private */final KafkaConsumerMessageSink messageSink;
+
+  // proxy is doing the actual reading
+  final private KafkaConsumerProxy proxy;
+
+  /* package private */final Map<TopicPartition, String> 
topicPartitions2Offset = new HashMap<>();
+  /* package private */final Map<TopicPartition, SystemStreamPartition> 
topicPartitions2SSP = new HashMap<>();
+
+  /* package private */ long perPartitionFetchThreshold;
+  /* package private */ long perPartitionFetchThresholdBytes;
+
+  /**
+   * Constructor
+   * @param systemName system name for which we create the consumer
+   * @param config config
+   * @param metrics metrics
+   * @param clock - system clock
+   */
+  public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, 
Config config, String clientId,
+      KafkaSystemConsumerMetrics metrics, Clock clock) {
+
+    super(metrics.registry(), clock, metrics.getClass().getName());
+
+    this.kafkaConsumer = kafkaConsumer;
+    this.samzaConsumerMetrics = metrics;
+    this.clientId = clientId;
+    this.systemName = systemName;
+    this.config = config;
+    this.metricName = String.format("%s %s", systemName, clientId);
+
+    this.fetchThresholdBytesEnabled = new 
KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
+
+    // create a sink for passing the messages between the proxy and the 
consumer
+    messageSink = new KafkaConsumerMessageSink();
+
+    // Create the proxy to do the actual message reading. It is a separate 
thread that reads the messages from the stream
+    // and puts them into the sink.
+    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, 
messageSink, samzaConsumerMetrics, metricName);
+    LOG.info("Created consumer proxy: " + proxy);
+
+    LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, 
metricName={}, KafkaConsumer={}", systemName,
+        clientId, metricName, this.kafkaConsumer.toString());
+  }
+
+  public static <K, V> KafkaSystemConsumer getNewKafkaSystemConsumer(String 
systemName, Config config,
+      String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) {
+
+    // extract consumer configs and create kafka consumer
+    KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, 
clientId, config);
+    LOG.info("Created kafka consumer for system {}, clientId {}: {}", 
systemName, clientId, kafkaConsumer);
+
+    KafkaSystemConsumer kc = new KafkaSystemConsumer(kafkaConsumer, 
systemName, config, clientId, metrics, clock);
+    LOG.info("Created samza system consumer {}", kc.toString());
+
+    return kc;
+  }
+
+  /**
+   * create kafka consumer
+   * @param systemName system name for which we create the consumer
+   * @param clientId client id to use int the kafka client
+   * @param config config
+   * @return kafka consumer
+   */
+  public static <K, V> KafkaConsumer<K, V> getKafkaConsumerImpl(String 
systemName, String clientId, Config config) {
+
+    Map<String, String> injectProps = new HashMap<>();
+
+    // extract kafka client configs
+    KafkaConsumerConfig consumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, 
clientId, injectProps);
+
+    LOG.info("KafkaClient properties for systemName {}: {}", systemName, 
consumerConfig.originals());
+
+    return new KafkaConsumer<>(consumerConfig.originals());
+  }
+
+  @Override
+  public void start() {
+    if (!started.compareAndSet(false, true)) {
+      LOG.warn("attempting to start the consumer for the second (or more) 
time.");
+      return;
+    }
+    if (stopped.get()) {
+      LOG.warn("attempting to start a stopped consumer");
+      return;
+    }
+    // initialize the subscriptions for all the registered TopicPartitions
+    startSubscription();
+    // needs to be called after all the registrations are completed
+    setFetchThresholds();
+
+    startConsumer();
+    LOG.info("consumer {} started", this);
+  }
+
+  private void startSubscription() {
+    //subscribe to all the registered TopicPartitions
+    LOG.info("consumer {}, subscribes to {} ", this, 
topicPartitions2SSP.keySet());
+    try {
+      synchronized (kafkaConsumer) {
+        // we are using assign (and not subscribe), so we need to specify both 
topic and partition
+        kafkaConsumer.assign(topicPartitions2SSP.keySet());
+      }
+    } catch (Exception e) {
+      LOG.warn("startSubscription failed.", e);
+      throw new SamzaException(e);
+    }
+  }
+
+  /*
+   Set the offsets to start from.
+   Add the TopicPartitions to the proxy.
+   Start the proxy thread.
+   */
+  void startConsumer() {
+    //set the offset for each TopicPartition
+    if (topicPartitions2Offset.size() <= 0) {
+      LOG.warn("Consumer {} is not subscribed to any SSPs", this);
+    }
+
+    topicPartitions2Offset.forEach((tp, startingOffsetString) -> {
+      long startingOffset = Long.valueOf(startingOffsetString);
+
+      try {
+        synchronized (kafkaConsumer) {
+          // TODO in the future we may need to add special handling here for 
BEGIN/END_OFFSET
+          // this will call KafkaConsumer.seekToBegin/End()
+          kafkaConsumer.seek(tp, startingOffset); // this value should already 
be the 'upcoming' value
+        }
+      } catch (Exception e) {
+        // all other exceptions - non recoverable
+        LOG.error("Got Exception while seeking to " + startingOffsetString + " 
for " + tp, e);
+        throw new SamzaException(e);
+      }
+
+      LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + 
startingOffsetString);
+
+      // add the partition to the proxy
+      proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset);
+    });
+
+    // start the proxy thread
+    if (proxy != null && !proxy.isRunning()) {
+      LOG.info("Starting proxy: " + proxy);
+      proxy.start();
+    }
+  }
+
+  private void setFetchThresholds() {
+    // get the thresholds, and set defaults if not defined.
+    KafkaConfig kafkaConfig = new KafkaConfig(config);
+
+    Option<String> fetchThresholdOption = 
kafkaConfig.getConsumerFetchThreshold(systemName);
+    long fetchThreshold = FETCH_THRESHOLD;
+    if (fetchThresholdOption.isDefined()) {
+      fetchThreshold = Long.valueOf(fetchThresholdOption.get());
+      LOG.info("fetchThresholdOption is configured. fetchThreshold=" + 
fetchThreshold);
+    }
+
+    Option<String> fetchThresholdBytesOption = 
kafkaConfig.getConsumerFetchThresholdBytes(systemName);
+    long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
+    if (fetchThresholdBytesOption.isDefined()) {
+      fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
+      LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" 
+ fetchThresholdBytes);
+    }
+
+    int numTPs = topicPartitions2SSP.size();
+    assert (numTPs == topicPartitions2Offset.size());
+
+    LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; 
fetchThreshold=" + fetchThreshold);
+    LOG.info("number of topicPartitions " + numTPs);
+
+    if (numTPs > 0) {
+      perPartitionFetchThreshold = fetchThreshold / numTPs;
+      LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold);
+      if (fetchThresholdBytesEnabled) {
+        // currently this feature cannot be enabled, because we do not have 
the size of the messages available.
+        // messages get double buffered, hence divide by 2
+        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
+        LOG.info("perPartitionFetchThresholdBytes is enabled. 
perPartitionFetchThresholdBytes="
+            + perPartitionFetchThresholdBytes);
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stopping Samza kafkaConsumer " + this);
+
+    if (!stopped.compareAndSet(false, true)) {
+      LOG.warn("attempting to stop stopped consumer.");
+      return;
+    }
+
+    // stop the proxy (with 5 minutes timeout)
+    if (proxy != null) {
+      LOG.info("Stopping proxy " + proxy);
+      proxy.stop(TimeUnit.MINUTES.toMillis(5));
+    }
+
+    try {
+      synchronized (kafkaConsumer) {
+        LOG.info("Closing kafka consumer " + kafkaConsumer);
+        kafkaConsumer.close();
+      }
+    } catch (Exception e) {
+      LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e);
+    }
+  }
+
+  /*
+   record the ssp and the offset. Do not submit it to the consumer yet.
+   */
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
+    if (started.get()) {
+      String msg =
+          String.format("Trying to register partition after consumer has been 
started. sn=%s, ssp=%s", systemName,
+              systemStreamPartition);
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
+    if (!systemStreamPartition.getSystem().equals(systemName)) {
+      LOG.warn("ignoring SSP " + systemStreamPartition + ", because this 
consumer's system is " + systemName);
+      return;
+    }
+    super.register(systemStreamPartition, offset);
+
+    TopicPartition tp = toTopicPartition(systemStreamPartition);
+
+    topicPartitions2SSP.put(tp, systemStreamPartition);
+
+    LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + 
offset);
+
+    String existingOffset = topicPartitions2Offset.get(tp);
+    // register the older (of the two) offset in the consumer, to guarantee we 
do not miss any messages.
+    if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
+      topicPartitions2Offset.put(tp, offset);
+    }
+
+    samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp));
+  }
+
+  /**
+   * Compare two String offsets.
+   * Note. There is a method in KafkaAdmin that does that, but that would 
require instantiation of systemadmin for each consumer.
+   * @return see {@link Long#compareTo(Long)}
+   */
+  public static int compareOffsets(String offset1, String offset2) {
+    return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+  }
+
+  @Override
+  public String toString() {
+    return systemName + "/" + clientId + "/" + super.toString();
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws 
InterruptedException {
+
+    // check if the proxy is running
+    if (!proxy.isRunning()) {
+      stop();
+      if (proxy.getFailureCause() != null) {
+        String message = "KafkaConsumerProxy has stopped";
+        throw new SamzaException(message, proxy.getFailureCause());
+      } else {
+        LOG.warn("Failure cause is not populated for KafkaConsumerProxy");
+        throw new SamzaException("KafkaConsumerProxy has stopped");
+      }
+    }
+
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = 
super.poll(systemStreamPartitions, timeout);
+    return res;
+  }
+
+  /**
+   * convert from TopicPartition to TopicAndPartition
+   */
+  public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
+    return new TopicAndPartition(tp.topic(), tp.partition());
+  }
+
+  /**
+   * convert to TopicPartition from SystemStreamPartition
+   */
+  public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+    return new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId());
+  }
+
+  /**
+   * return system name for this consumer
+   * @return system name
+   */
+  public String getSystemName() {
+    return systemName;
+  }
+
+  ////////////////////////////////////
+  // inner class for the message sink
+  ////////////////////////////////////
+  public class KafkaConsumerMessageSink {
+
+    public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean 
isAtHighWatermark) {
+      setIsAtHead(ssp, isAtHighWatermark);
+    }
+
+    boolean needsMoreMessages(SystemStreamPartition ssp) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes 
enabled={}; messagesSizeInQueue={};"
+                + "(limit={}); messagesNumInQueue={}(limit={};", ssp, 
fetchThresholdBytesEnabled,
+            getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, 
getNumMessagesInQueue(ssp),
+            perPartitionFetchThreshold);
+      }
+
+      if (fetchThresholdBytesEnabled) {
+        return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
+      } else {
+        return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold;
+      }
+    }
+
+    void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope 
envelope) {
+      LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope);
+
+      try {
+        put(ssp, envelope);
+      } catch (InterruptedException e) {
+        throw new SamzaException(
+            String.format("Interrupted while trying to add message with offset 
%s for ssp %s", envelope.getOffset(),
+                ssp));
+      }
+    }
+  }  // end of KafkaMessageSink class
+  ///////////////////////////////////////////////////////////////////////////
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 6f58bed..e0e85be 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -50,7 +50,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val clientId = KafkaConsumerConfig.getConsumerClientId( config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
-    NewKafkaSystemConsumer.getNewKafkaSystemConsumer(
+    KafkaSystemConsumer.getNewKafkaSystemConsumer(
       systemName, config, clientId, metrics, new SystemClock)
   }
 
@@ -76,7 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
-    val clientId = KafkaConsumerConfig.getConsumerClientId(config)
+    val clientId = KafkaConsumerConfig.getAdminClientId(config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
     val bootstrapServers = producerConfig.bootsrapServers
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)

http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
deleted file mode 100644
index afec8ad..0000000
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ /dev/null
@@ -1,412 +0,0 @@
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import kafka.common.TopicAndPartition;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.Partition;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.BlockingEnvelopeMap;
-import org.apache.samza.util.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-
-public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap 
implements SystemConsumer {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(NewKafkaSystemConsumer.class);
-
-  private static final long FETCH_THRESHOLD = 50000;
-  private static final long FETCH_THRESHOLD_BYTES = -1L;
-
-  private final Consumer<K, V> kafkaConsumer;
-  private final String systemName;
-  private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
-  private final String clientId;
-  private final String metricName;
-  private final AtomicBoolean stopped = new AtomicBoolean(false);
-  private final AtomicBoolean started = new AtomicBoolean(false);
-  private final Config config;
-  private final boolean fetchThresholdBytesEnabled;
-
-  // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
-  /* package private */ KafkaConsumerMessageSink messageSink;
-
-  // proxy is doing the actual reading
-  private KafkaConsumerProxy proxy;
-
-  /* package private */final Map<TopicPartition, String> 
topicPartitions2Offset = new HashMap<>();
-  /* package private */final Map<TopicPartition, SystemStreamPartition> 
topicPartitions2SSP = new HashMap<>();
-
-  /* package private */ long perPartitionFetchThreshold;
-  /* package private */ long perPartitionFetchThresholdBytes;
-
-  /**
-   * @param systemName
-   * @param config
-   * @param metrics
-   */
-  protected NewKafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String 
systemName, Config config, String clientId,
-      KafkaSystemConsumerMetrics metrics, Clock clock) {
-
-    super(metrics.registry(), clock, metrics.getClass().getName());
-
-    this.kafkaConsumer = kafkaConsumer;
-    this.samzaConsumerMetrics = metrics;
-    this.clientId = clientId;
-    this.systemName = systemName;
-    this.config = config;
-    this.metricName = systemName + " " + clientId;
-
-    this.fetchThresholdBytesEnabled = new 
KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
-
-    LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, 
metricName={}, KafkaConsumer={}", systemName,
-        clientId, metricName, this.kafkaConsumer.toString());
-  }
-
-  public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String 
systemName, Config config,
-      String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) {
-
-    // extract consumer configs and create kafka consumer
-    KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, 
clientId, config);
-    LOG.info("Created kafka consumer for system {}, clientId {}: {}", 
systemName, clientId, kafkaConsumer);
-
-    NewKafkaSystemConsumer kc = new NewKafkaSystemConsumer(kafkaConsumer, 
systemName, config, clientId, metrics, clock);
-    LOG.info("Created samza system consumer {}", kc.toString());
-
-    return kc;
-  }
-
-  /**
-   * create kafka consumer
-   * @param systemName
-   * @param clientId
-   * @param config
-   * @return kafka consumer
-   */
-  private static <K, V> KafkaConsumer<K, V> getKafkaConsumerImpl(String 
systemName, String clientId, Config config) {
-
-    Map<String, String> injectProps = new HashMap<>();
-
-    // extract kafka client configs
-    KafkaConsumerConfig consumerConfig =
-        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, 
clientId, injectProps);
-
-    LOG.info("KafkaClient properties for systemName {}: {}", systemName, 
consumerConfig.originals());
-
-    return new KafkaConsumer<>(consumerConfig.originals());
-  }
-
-  @Override
-  public void start() {
-    if (!started.compareAndSet(false, true)) {
-      LOG.warn("attempting to start the consumer for the second (or more) 
time.");
-      return;
-    }
-    if (stopped.get()) {
-      LOG.warn("attempting to start a stopped consumer");
-      return;
-    }
-    // initialize the subscriptions for all the registered TopicPartitions
-    startSubscription();
-    // needs to be called after all the registrations are completed
-    setFetchThresholds();
-    // Create the proxy to do the actual message reading. It is a separate 
thread that reads the messages from the stream
-    // and puts them into the sink.
-    createConsumerProxy();
-    startConsumer();
-    LOG.info("consumer {} started", this);
-  }
-
-  private void startSubscription() {
-    //subscribe to all the registered TopicPartitions
-    LOG.info("consumer {}, subscribes to {} ", this, 
topicPartitions2SSP.keySet());
-    try {
-      synchronized (kafkaConsumer) {
-        // we are using assign (and not subscribe), so we need to specify both 
topic and partition
-        kafkaConsumer.assign(topicPartitions2SSP.keySet());
-      }
-    } catch (Exception e) {
-      LOG.warn("startSubscription failed.", e);
-      throw new SamzaException(e);
-    }
-  }
-
-  void createConsumerProxy() {
-    // create a sink for passing the messages between the proxy and the 
consumer
-    messageSink = new KafkaConsumerMessageSink();
-
-    // create the thread with the consumer
-    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, 
messageSink, samzaConsumerMetrics, metricName);
-
-    LOG.info("Created consumer proxy: " + proxy);
-  }
-
-  /*
-   Set the offsets to start from.
-   Add the TopicPartitions to the proxy.
-   Start the proxy thread.
-   */
-  void startConsumer() {
-    //set the offset for each TopicPartition
-    if (topicPartitions2Offset.size() <= 0) {
-      LOG.warn("Consumer {} is not subscribed to any SSPs", this);
-    }
-
-    topicPartitions2Offset.forEach((tp, startingOffsetString) -> {
-      long startingOffset = Long.valueOf(startingOffsetString);
-
-      try {
-        synchronized (kafkaConsumer) {
-          // TODO in the future we may need to add special handling here for 
BEGIN/END_OFFSET
-          // this will call KafkaConsumer.seekToBegin/End()
-          kafkaConsumer.seek(tp, startingOffset); // this value should already 
be the 'upcoming' value
-        }
-      } catch (Exception e) {
-        // all other exceptions - non recoverable
-        LOG.error("Got Exception while seeking to " + startingOffsetString + " 
for " + tp, e);
-        throw new SamzaException(e);
-      }
-
-      LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + 
startingOffsetString);
-
-      // add the partition to the proxy
-      proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset);
-    });
-
-    // start the proxy thread
-    if (proxy != null && !proxy.isRunning()) {
-      LOG.info("Starting proxy: " + proxy);
-      proxy.start();
-    }
-  }
-
-  private void setFetchThresholds() {
-    // get the thresholds, and set defaults if not defined.
-    KafkaConfig kafkaConfig = new KafkaConfig(config);
-
-    Option<String> fetchThresholdOption = 
kafkaConfig.getConsumerFetchThreshold(systemName);
-    long fetchThreshold = FETCH_THRESHOLD;
-    if (fetchThresholdOption.isDefined()) {
-      fetchThreshold = Long.valueOf(fetchThresholdOption.get());
-      LOG.info("fetchThresholdOption is configured. fetchThreshold=" + 
fetchThreshold);
-    }
-
-    Option<String> fetchThresholdBytesOption = 
kafkaConfig.getConsumerFetchThresholdBytes(systemName);
-    long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
-    if (fetchThresholdBytesOption.isDefined()) {
-      fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
-      LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" 
+ fetchThresholdBytes);
-    }
-
-    int numTPs = topicPartitions2SSP.size();
-    assert (numTPs == topicPartitions2Offset.size());
-
-    LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; 
fetchThreshold=" + fetchThreshold);
-    LOG.info("number of topicPartitions " + numTPs);
-
-    if (numTPs > 0) {
-      perPartitionFetchThreshold = fetchThreshold / numTPs;
-      LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold);
-      if (fetchThresholdBytesEnabled) {
-        // currently this feature cannot be enabled, because we do not have 
the size of the messages available.
-        // messages get double buffered, hence divide by 2
-        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
-        LOG.info("perPartitionFetchThresholdBytes is enabled. 
perPartitionFetchThresholdBytes="
-            + perPartitionFetchThresholdBytes);
-      }
-    }
-  }
-
-  @Override
-  public void stop() {
-    LOG.info("Stopping Samza kafkaConsumer " + this);
-
-    if (!stopped.compareAndSet(false, true)) {
-      LOG.warn("attempting to stop stopped consumer.");
-      return;
-    }
-
-    // stop the proxy (with 5 minutes timeout)
-    if (proxy != null) {
-      LOG.info("Stopping proxy " + proxy);
-      proxy.stop(TimeUnit.MINUTES.toMillis(5));
-    }
-
-    try {
-      synchronized (kafkaConsumer) {
-        LOG.info("Closing kafka consumer " + kafkaConsumer);
-        kafkaConsumer.close();
-      }
-    } catch (Exception e) {
-      LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e);
-    }
-  }
-
-  /*
-   record the ssp and the offset. Do not submit it to the consumer yet.
-   */
-  @Override
-  public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
-    if (started.get()) {
-      String msg =
-          String.format("Trying to register partition after consumer has been 
started. sn=%s, ssp=%s", systemName,
-              systemStreamPartition);
-      LOG.error(msg);
-      throw new SamzaException(msg);
-    }
-
-    if (!systemStreamPartition.getSystem().equals(systemName)) {
-      LOG.warn("ignoring SSP " + systemStreamPartition + ", because this 
consumer's system is " + systemName);
-      return;
-    }
-    super.register(systemStreamPartition, offset);
-
-    TopicPartition tp = toTopicPartition(systemStreamPartition);
-
-    topicPartitions2SSP.put(tp, systemStreamPartition);
-
-    LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + 
offset);
-
-    String existingOffset = topicPartitions2Offset.get(tp);
-    // register the older (of the two) offset in the consumer, to guarantee we 
do not miss any messages.
-    if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
-      topicPartitions2Offset.put(tp, offset);
-    }
-
-    samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp));
-  }
-
-  /**
-   * Compare two String offsets.
-   * Note. There is a method in KafkaAdmin that does that, but that would 
require instantiation of systemadmin for each consumer.
-   * @param off1
-   * @param off2
-   * @return see {@link Long#compareTo(Long)}
-   */
-  public static int compareOffsets(String off1, String off2) {
-    return Long.valueOf(off1).compareTo(Long.valueOf(off2));
-  }
-
-  @Override
-  public String toString() {
-    return systemName + "/" + clientId + "/" + super.toString();
-  }
-
-  @Override
-  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
-      Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws 
InterruptedException {
-
-    // check if the proxy is running
-    if (!proxy.isRunning()) {
-      stop();
-      if (proxy.getFailureCause() != null) {
-        String message = "KafkaConsumerProxy has stopped";
-        throw new SamzaException(message, proxy.getFailureCause());
-      } else {
-        LOG.warn("Failure cause is not populated for KafkaConsumerProxy");
-        throw new SamzaException("KafkaConsumerProxy has stopped");
-      }
-    }
-
-    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = 
super.poll(systemStreamPartitions, timeout);
-    return res;
-  }
-
-  public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
-    return new TopicAndPartition(tp.topic(), tp.partition());
-  }
-
-  public static TopicAndPartition toTopicAndPartition(SystemStreamPartition 
ssp) {
-    return new TopicAndPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId());
-  }
-
-  public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
-    return new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId());
-  }
-
-  public static SystemStreamPartition toSystemStreamPartition(String 
systemName, TopicAndPartition tp) {
-    return new SystemStreamPartition(systemName, tp.topic(), new 
Partition(tp.partition()));
-  }
-
-  /**
-   * return system name for this consumer
-   * @return system name
-   */
-  public String getSystemName() {
-    return systemName;
-  }
-
-  ////////////////////////////////////
-  // inner class for the message sink
-  ////////////////////////////////////
-  public class KafkaConsumerMessageSink {
-
-    public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean 
isAtHighWatermark) {
-      setIsAtHead(ssp, isAtHighWatermark);
-    }
-
-    boolean needsMoreMessages(SystemStreamPartition ssp) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes 
enabled={}; messagesSizeInQueue={};"
-                + "(limit={}); messagesNumInQueue={}(limit={};", ssp, 
fetchThresholdBytesEnabled,
-            getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, 
getNumMessagesInQueue(ssp),
-            perPartitionFetchThreshold);
-      }
-
-      if (fetchThresholdBytesEnabled) {
-        return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
-      } else {
-        return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold;
-      }
-    }
-
-    void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope 
envelope) {
-      LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope);
-
-      try {
-        put(ssp, envelope);
-      } catch (InterruptedException e) {
-        throw new SamzaException(
-            String.format("Interrupted while trying to add message with offset 
%s for ssp %s", envelope.getOffset(),
-                ssp));
-      }
-    }
-  }  // end of KafkaMessageSink class
-  ///////////////////////////////////////////////////////////////////////////
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
new file mode 100644
index 0000000..d90bc35
--- /dev/null
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -0,0 +1,224 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestKafkaSystemConsumer {
+  public final String TEST_SYSTEM = "test-system";
+  public final String TEST_STREAM = "test-stream";
+  public final String TEST_CLIENT_ID = "testClientId";
+  public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+  public final String FETCH_THRESHOLD_MSGS = "50000";
+  public final String FETCH_THRESHOLD_BYTES = "100000";
+
+  @Before
+  public void setUp() {
+
+  }
+
+  private KafkaSystemConsumer setupConsumer(String fetchMsg, String 
fetchBytes) {
+    final Map<String, String> map = new HashMap<>();
+
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), 
TEST_SYSTEM), fetchMsg);
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), 
TEST_SYSTEM), fetchBytes);
+    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+        BOOTSTRAP_SERVER);
+
+    Config config = new MapConfig(map);
+    KafkaConsumerConfig consumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, 
TEST_CLIENT_ID, Collections.emptyMap());
+    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
MockKafkaConsumer(consumerConfig.originals());
+
+    MockKafkaSystmeCosumer newKafkaSystemConsumer =
+        new MockKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, 
TEST_CLIENT_ID,
+            new KafkaSystemConsumerMetrics(TEST_SYSTEM, new 
NoOpMetricsRegistry()), System::currentTimeMillis);
+
+    return newKafkaSystemConsumer;
+  }
+
+  @Test
+  public void testConfigValidations() {
+
+    final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    consumer.start();
+    // should be no failures
+  }
+
+  @Test
+  public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
+    final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+    final int partitionsNum = 50;
+    for (int i = 0; i < partitionsNum; i++) {
+      consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(i)), "0");
+    }
+
+    consumer.start();
+
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, 
consumer.perPartitionFetchThreshold);
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / 
partitionsNum,
+        consumer.perPartitionFetchThresholdBytes);
+  }
+
+  @Test
+  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
+
+    KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(2));
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp0, "5");
+    consumer.register(ssp1, "2");
+    consumer.register(ssp1, "3");
+    consumer.register(ssp2, "0");
+
+    assertEquals("0", 
consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
+    assertEquals("2", 
consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
+    assertEquals("0", 
consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+  }
+
+  @Test
+  public void testFetchThresholdBytes() {
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; 
// fake size
+    int ime11Size = 20;
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+    KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // queue for ssp0 should be full now, because we added message of size 
FETCH_THRESHOLD_MSGS/partitionsNum
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // queue for ssp1 should be less then full now, because we added message 
of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // queue for ssp1 should full now, because we added message of size 20 on 
top
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
+  }
+
+  @Test
+  public void testFetchThresholdBytesDiabled() {
+    // Pass 0 as fetchThresholdByBytes, which disables checking for limit by 
size
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size, upto the limit
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 
100; // fake size, below the limit
+    int ime11Size = 20;// event with the second message still below the size 
limit
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+
+    // limit by number of messages 4/2 = 2 per partition
+    // limit by number of bytes - disabled
+    KafkaSystemConsumer consumer = setupConsumer("4", "0"); // should disable
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // should be full by size, but not full by number of messages (1 of 2)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // not full neither by size nor by messages
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // not full by size, but should be full by messages
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
+  }
+
+  // mock kafkaConsumer and SystemConsumer
+  static class MockKafkaConsumer extends KafkaConsumer {
+    public MockKafkaConsumer(Map<String, Object> configs) {
+      super(configs);
+    }
+  }
+
+  static class MockKafkaSystmeCosumer extends KafkaSystemConsumer {
+    public MockKafkaSystmeCosumer(Consumer kafkaConsumer, String systemName, 
Config config, String clientId,
+        KafkaSystemConsumerMetrics metrics, Clock clock) {
+      super(kafkaConsumer, systemName, config, clientId, metrics, clock);
+    }
+
+    //@Override
+    //void createConsumerProxy() {
+    //  this.messageSink = new KafkaConsumerMessageSink();
+    //}
+
+    @Override
+    void startConsumer() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/26552213/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java
deleted file mode 100644
index fb7533b..0000000
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Clock;
-import org.apache.samza.util.NoOpMetricsRegistry;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
-public class TestNewKafkaSystemConsumer {
-  public final String TEST_SYSTEM = "test-system";
-  public final String TEST_STREAM = "test-stream";
-  public final String TEST_CLIENT_ID = "testClientId";
-  public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
-  public final String FETCH_THRESHOLD_MSGS = "50000";
-  public final String FETCH_THRESHOLD_BYTES = "100000";
-
-  @Before
-  public void setUp() {
-
-  }
-
-  private NewKafkaSystemConsumer setupConsumer(String fetchMsg, String 
fetchBytes) {
-    final Map<String, String> map = new HashMap<>();
-
-    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), 
TEST_SYSTEM), fetchMsg);
-    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), 
TEST_SYSTEM), fetchBytes);
-    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
-        BOOTSTRAP_SERVER);
-
-    Config config = new MapConfig(map);
-    KafkaConsumerConfig consumerConfig =
-        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, 
TEST_CLIENT_ID, Collections.emptyMap());
-    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
MockKafkaConsumer(consumerConfig.originals());
-
-    MockNewKafkaSystmeCosumer newKafkaSystemConsumer =
-        new MockNewKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, 
TEST_CLIENT_ID,
-            new KafkaSystemConsumerMetrics(TEST_SYSTEM, new 
NoOpMetricsRegistry()), System::currentTimeMillis);
-
-    return newKafkaSystemConsumer;
-  }
-
-  @Test
-  public void testConfigValidations() {
-
-    final NewKafkaSystemConsumer consumer = 
setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
-
-    consumer.start();
-    // should be no failures
-  }
-
-  @Test
-  public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
-    final NewKafkaSystemConsumer consumer = 
setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
-    final int partitionsNum = 50;
-    for (int i = 0; i < partitionsNum; i++) {
-      consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(i)), "0");
-    }
-
-    consumer.start();
-
-    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, 
consumer.perPartitionFetchThreshold);
-    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / 
partitionsNum,
-        consumer.perPartitionFetchThresholdBytes);
-  }
-
-  @Test
-  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
-
-    NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
-
-    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
-    SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(2));
-
-    consumer.register(ssp0, "0");
-    consumer.register(ssp0, "5");
-    consumer.register(ssp1, "2");
-    consumer.register(ssp1, "3");
-    consumer.register(ssp2, "0");
-
-    assertEquals("0", 
consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp0)));
-    assertEquals("2", 
consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp1)));
-    assertEquals("0", 
consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp2)));
-  }
-
-  @Test
-  public void testFetchThresholdBytes() {
-
-    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
-    int partitionsNum = 2;
-    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size
-    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; 
// fake size
-    int ime11Size = 20;
-    ByteArraySerializer bytesSerde = new ByteArraySerializer();
-    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
-        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
-    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
-        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
-    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
-        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
-    NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
-
-    consumer.register(ssp0, "0");
-    consumer.register(ssp1, "0");
-    consumer.start();
-    consumer.messageSink.addMessage(ssp0, ime0);
-    // queue for ssp0 should be full now, because we added message of size 
FETCH_THRESHOLD_MSGS/partitionsNum
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
-    consumer.messageSink.addMessage(ssp1, ime1);
-    // queue for ssp1 should be less then full now, because we added message 
of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
-    consumer.messageSink.addMessage(ssp1, ime11);
-    // queue for ssp1 should full now, because we added message of size 20 on 
top
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
-
-    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
-    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
-    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
-    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
-  }
-
-  @Test
-  public void testFetchThresholdBytesDiabled() {
-    // Pass 0 as fetchThresholdByBytes, which disables checking for limit by 
size
-
-    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
-    int partitionsNum = 2;
-    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size, upto the limit
-    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 
100; // fake size, below the limit
-    int ime11Size = 20;// event with the second message still below the size 
limit
-    ByteArraySerializer bytesSerde = new ByteArraySerializer();
-    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
-        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
-    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
-        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
-    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
-        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
-
-    // limit by number of messages 4/2 = 2 per partition
-    // limit by number of bytes - disabled
-    NewKafkaSystemConsumer consumer = setupConsumer("4", "0"); // should 
disable
-
-    consumer.register(ssp0, "0");
-    consumer.register(ssp1, "0");
-    consumer.start();
-    consumer.messageSink.addMessage(ssp0, ime0);
-    // should be full by size, but not full by number of messages (1 of 2)
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
-    consumer.messageSink.addMessage(ssp1, ime1);
-    // not full neither by size nor by messages
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
-    consumer.messageSink.addMessage(ssp1, ime11);
-    // not full by size, but should be full by messages
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
-
-    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
-    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
-    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
-    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
-  }
-
-  // mock kafkaConsumer and SystemConsumer
-  static class MockKafkaConsumer extends KafkaConsumer {
-    public MockKafkaConsumer(Map<String, Object> configs) {
-      super(configs);
-    }
-  }
-
-  static class MockNewKafkaSystmeCosumer extends NewKafkaSystemConsumer {
-    public MockNewKafkaSystmeCosumer(Consumer kafkaConsumer, String 
systemName, Config config, String clientId,
-        KafkaSystemConsumerMetrics metrics, Clock clock) {
-      super(kafkaConsumer, systemName, config, clientId, metrics, clock);
-    }
-
-    @Override
-    void createConsumerProxy() {
-      this.messageSink = new KafkaConsumerMessageSink();
-    }
-
-    @Override
-    void startConsumer() {
-    }
-  }
-}

Reply via email to