Added new KafkaProxy and KafkaConsumer for default KafkaSystem

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72544606
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72544606
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72544606

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 72544606bfffc67aeaa7f509ca54cfd6db52e2b4
Parents: 4801709
Author: Boris S <[email protected]>
Authored: Fri Aug 17 18:08:52 2018 -0700
Committer: Boris S <[email protected]>
Committed: Fri Aug 17 18:08:52 2018 -0700

----------------------------------------------------------------------
 .../clients/consumer/KafkaConsumerConfig.java   | 152 ++++++
 .../samza/system/kafka/KafkaConsumerProxy.java  | 463 +++++++++++++++++++
 .../samza/system/kafka/KafkaSystemFactory.scala |  54 ++-
 .../system/kafka/NewKafkaSystemConsumer.java    | 403 ++++++++++++++++
 .../kafka/TestKafkaCheckpointManager.scala      |   8 +-
 5 files changed, 1064 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
new file mode 100644
index 0000000..97360e2
--- /dev/null
+++ 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import scala.Option;
+
+
+/**
+ * The configuration class for KafkaConsumer
+ */
+public class KafkaConsumerConfig extends ConsumerConfig {
+
+  private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
+  private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
+  private static final String SAMZA_OFFSET_LARGEST = "largest";
+  private static final String SAMZA_OFFSET_SMALLEST = "smallest";
+  private static final String KAFKA_OFFSET_LATEST = "latest";
+  private static final String KAFKA_OFFSET_EARLIEST = "earliest";
+  /*
+   * By default, KafkaConsumer will fetch ALL available messages for all the 
partitions.
+   * This may cause memory issues. That's why we will limit the number of 
messages per partition we get on EACH poll().
+   */
+  private static final String KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT = "100";
+
+
+  public KafkaConsumerConfig(Properties props) {
+    super(props);
+  }
+
+  public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config,
+      String systemName, String clientId, Map<String, String> injectProps) {
+
+    Config subConf = config.subset(String.format("systems.%s.consumer.", 
systemName), true);
+
+    String groupId = getConsumerGroupId(config);
+
+    Properties consumerProps = new Properties();
+    consumerProps.putAll(subConf);
+
+    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+    /********************************************
+     * Open-source Kafka Consumer configuration *
+     *******************************************/
+    consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false"); // Disable consumer auto-commit
+
+    consumerProps.setProperty(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        getAutoOffsetResetValue(consumerProps));  // Translate samza config 
value to kafka config value
+
+    // makesure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT?
+    if (! subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+      // get it from the producer config
+      String bootstrapServer = 
config.get(String.format("systems.%s.producer.%s", systemName, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+      consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+    }
+
+    // Always use default partition assignment strategy. Do not allow override.
+    consumerProps.setProperty(
+        ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+        RangeAssignor.class.getName());
+
+
+    // NOT SURE THIS IS NEEDED TODO
+    String maxPollRecords = 
subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT);;
+    consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords);
+
+    // put overrides
+    consumerProps.putAll(injectProps);
+
+    return new KafkaConsumerConfig(consumerProps);
+  }
+
+  // group id should be unique per job
+  static String getConsumerGroupId(Config config) {
+    JobConfig jobConfig = new JobConfig(config);
+    Option<String> jobIdOption = jobConfig.getJobId();
+    Option<String> jobNameOption = jobConfig.getName();
+    return (jobNameOption.isDefined()? jobNameOption.get() : 
"undefined_job_name") + "-"
+        + (jobIdOption.isDefined()? jobIdOption.get() : "undefined_job_id");
+  }
+  // client id should be unique per job
+  public static String getClientId(String id, Config config) {
+    if (config.get(JobConfig.JOB_NAME()) == null) {
+      throw new ConfigException("Missing job name");
+    }
+    String jobName = config.get(JobConfig.JOB_NAME());
+    String jobId = "1";
+    if (config.get(JobConfig.JOB_ID()) != null) {
+      jobId = config.get(JobConfig.JOB_ID());
+    }
+    return getClientId(id, jobName, jobId);
+  }
+
+  private static String getClientId(String id, String jobName, String jobId) {
+    return String.format(
+        "%s-%s-%s",
+        id.replaceAll("[^A-Za-z0-9]", "_"),
+        jobName.replaceAll("[^A-Za-z0-9]", "_"),
+        jobId.replaceAll("[^A-Za-z0-9]", "_"));
+  }
+
+  public static String getProducerClientId(Config config) {
+    return getClientId(PRODUCER_CLIENT_ID_PREFIX, config);
+  }
+
+  /**
+   * Settings for auto.reset in samza are different from settings in Kafka 
(auto.offset.reset) - need to convert
+   * "largest" -> "latest"
+   * "smallest" -> "earliest"
+   * "none" - will fail the kafka consumer, if offset is out of range
+   * @param properties All consumer related {@link Properties} parsed from 
samza config
+   * @return String representing the config value for "auto.offset.reset" 
property
+   */
+  static String getAutoOffsetResetValue(Properties properties) {
+    String autoOffsetReset = 
properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
KAFKA_OFFSET_LATEST);
+    switch (autoOffsetReset) {
+      case SAMZA_OFFSET_LARGEST:
+        return KAFKA_OFFSET_LATEST;
+      case SAMZA_OFFSET_SMALLEST:
+        return KAFKA_OFFSET_EARLIEST;
+      default:
+        return KAFKA_OFFSET_LATEST;
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
new file mode 100644
index 0000000..66971af
--- /dev/null
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -0,0 +1,463 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import kafka.common.KafkaException;
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Separate thread that reads messages from kafka and puts them int the 
BlockingEnvelopeMap
+ * This class is not thread safe. There will be only one instance of this 
class per LiKafkaSystemConsumer object
+ * We still need some synchronization around kafkaConsumer. See pollConsumer() 
method for details.
+ */
+public class KafkaConsumerProxy<K, V> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerProxy.class);
+
+  private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
+
+  /* package private */ final Thread consumerPollThread;
+  private final Consumer<K, V> kafkaConsumer;
+  private final NewKafkaSystemConsumer.KafkaConsumerMessageSink sink;
+  private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
+  private final String metricName;
+  private final String systemName;
+  private final String clientId;
+  private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP 
= new HashMap<>();
+  private final Map<SystemStreamPartition, MetricName> ssp2MetricName = new 
HashMap<>();
+  // list of all the SSPs we poll from with their next offsets correspondingly.
+  private final Map<SystemStreamPartition, Long> nextOffsets = new 
ConcurrentHashMap<>();
+  // lags behind the high water mark, as reported by the Kafka consumer.
+  private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>();
+  private final NewKafkaSystemConsumer.ValueUnwrapper<V> valueUnwrapper;
+
+  private volatile boolean isRunning = false;
+  private volatile Throwable failureCause = null;
+  private CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
+
+  public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, 
String clientId,
+      NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, 
KafkaSystemConsumerMetrics samzaConsumerMetrics,
+      String metricName, NewKafkaSystemConsumer.ValueUnwrapper<V> 
valueUnwrapper) {
+
+    this.kafkaConsumer = kafkaConsumer;
+    this.systemName = systemName;
+    this.sink = messageSink;
+    this.kafkaConsumerMetrics = samzaConsumerMetrics;
+    this.metricName = metricName;
+    this.clientId = clientId;
+    this.valueUnwrapper = valueUnwrapper;
+
+    // TODO - see if we need new metrics (not host:port based)
+    this.kafkaConsumerMetrics.registerBrokerProxy(metricName, 0);
+
+    consumerPollThread = new Thread(createProxyThreadRunnable());
+  }
+
+  public void start() {
+    if (!consumerPollThread.isAlive()) {
+      LOG.info("Starting LiKafkaConsumerProxy polling thread for system " + 
systemName + " " + this.toString());
+      consumerPollThread.setDaemon(true);
+      consumerPollThread.setName(
+          "Samza LiKafkaConsumerProxy Poll " + consumerPollThread.getName() + 
" - " + systemName);
+      consumerPollThread.start();
+
+      // we need to wait until the thread starts
+      while (!isRunning) {
+        try {
+          consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+        }
+      }
+    } else {
+      LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). 
Ignoring.", this.toString());
+    }
+  }
+
+  // add new partition to the list of polled partitions
+  // this method is called only at the beginning, before the thread is started
+  public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
+    LOG.info(String.format("Adding new topic and partition %s, offset = %s to 
queue for consumer %s", ssp, nextOffset,
+        this));
+    topicPartitions2SSP.put(NewKafkaSystemConsumer.toTopicPartition(ssp), 
ssp); //registered SSPs
+
+    // this is already vetted offset so there is no need to validate it
+    LOG.info(String.format("Got offset %s for new topic and partition %s.", 
nextOffset, ssp));
+
+    nextOffsets.put(ssp, nextOffset);
+
+    // we reuse existing metrics. They assume host and port for the broker
+    // for now fake the port with the consumer name
+    kafkaConsumerMetrics.setTopicPartitionValue(metricName, 0, 
nextOffsets.size());
+  }
+
+  /**
+   * creates a separate thread for pulling messages
+   */
+  private Runnable createProxyThreadRunnable() {
+    return () -> {
+      isRunning = true;
+
+      try {
+        consumerPollThreadStartLatch.countDown();
+        initializeLags();
+        while (isRunning) {
+          fetchMessages();
+        }
+      } catch (Throwable throwable) {
+        LOG.error(String.format("Error in LiKafkaConsumerProxy poll thread for 
system: %s.", systemName), throwable);
+        // SamzaLiKafkaSystemConsumer uses the failureCause to propagate the 
throwable to the container
+        failureCause = throwable;
+        isRunning = false;
+      }
+
+      if (!isRunning) {
+        LOG.info("Stopping the LiKafkaConsumerProxy poll thread for system: 
{}.", systemName);
+      }
+    };
+  }
+
+  private void initializeLags() {
+    // This is expensive, so only do it once at the beginning. After the first 
poll, we can rely on metrics for lag.
+    Map<TopicPartition, Long> endOffsets = 
kafkaConsumer.endOffsets(topicPartitions2SSP.keySet());
+    endOffsets.forEach((tp, offset) -> {
+      SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
+      long startingOffset = nextOffsets.get(ssp);
+      // End offsets are the offset of the newest message + 1
+      // If the message we are about to consume is < end offset, we are 
starting with a lag.
+      long initialLag = endOffsets.get(tp) - startingOffset;
+
+      LOG.info("Initial lag is {} for SSP {}", initialLag, ssp);
+      latestLags.put(ssp, initialLag);
+      sink.setIsAtHighWatermark(ssp, initialLag == 0);
+    });
+
+    // initialize lag metrics
+    refreshLatencyMetrics();
+  }
+
+  // the actual polling of the messages from kafka
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
pollConsumer(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
+
+    if (topicPartitions2SSP.size() == 0) {
+      throw new SamzaException("cannot poll empty set of TopicPartitions");
+    }
+
+    // Since we need to poll only from some subset of TopicPartitions (passed 
as the argument),
+    // we need to pause the rest.
+    List<TopicPartition> topicPartitionsToPause = new ArrayList<>();
+    List<TopicPartition> topicPartitionsToPoll = new ArrayList<>();
+
+    for (Map.Entry<TopicPartition, SystemStreamPartition> e : 
topicPartitions2SSP.entrySet()) {
+      TopicPartition tp = e.getKey();
+      SystemStreamPartition ssp = e.getValue();
+      if (systemStreamPartitions.contains(ssp)) {
+        topicPartitionsToPoll.add(tp);  // consume
+      } else {
+        topicPartitionsToPause.add(tp); // ignore
+      }
+    }
+
+    ConsumerRecords<K, V> records;
+    // make a call on the client
+    try {
+      // Currently, when doing checkpoint we are making a safeOffset request 
through this client, thus we need to synchronize
+      // them. In the future we may use this client for the actually 
checkpointing.
+      synchronized (kafkaConsumer) {
+        // Since we are not polling from ALL the subscribed topics, so we need 
to "change" the subscription temporarily
+        kafkaConsumer.pause(topicPartitionsToPause);
+        kafkaConsumer.resume(topicPartitionsToPoll);
+        records = kafkaConsumer.poll(timeout);
+        // resume original set of subscription - may be required for 
checkpointing
+        kafkaConsumer.resume(topicPartitionsToPause);
+      }
+    } catch (InvalidOffsetException e) {
+      LOG.error("LiKafkaConsumer with invalidOffsetException", e);
+      // If the consumer has thrown this exception it means that auto reset is 
not set for this consumer.
+      // So we just rethrow.
+      LOG.error("Caught InvalidOffsetException in pollConsumer", e);
+      throw e;
+    } catch (KafkaException e) {
+      // we may get InvalidOffsetException | AuthorizationException | 
KafkaException exceptions,
+      // but we still just rethrow, and log it up the stack.
+      LOG.error("Caught a Kafka exception in pollConsumer", e);
+      throw e;
+    }
+
+    return processResults(records);
+  }
+
+  private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
processResults(ConsumerRecords<K, V> records) {
+    if (records == null) {
+      return Collections.emptyMap();
+    }
+
+    int capacity = (int) (records.count() / 0.75 + 1); // to avoid rehash, 
allocate more then 75% of expected capacity.
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new 
HashMap<>(capacity);
+    // Parse the returned records and convert them into the 
IncomingMessageEnvelope.
+    // Note. They have been already de-serialized by the consumer.
+    for (ConsumerRecord<K, V> r : records) {
+      int partition = r.partition();
+      String topic = r.topic();
+      TopicPartition tp = new TopicPartition(topic, partition);
+
+      updateMetrics(r, tp);
+
+      SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
+      List<IncomingMessageEnvelope> listMsgs = results.get(ssp);
+      if (listMsgs == null) {
+        listMsgs = new ArrayList<>();
+        results.put(ssp, listMsgs);
+      }
+
+      // TODO - add calculation of the size of the message, when available 
from Kafka
+      int msgSize = 0;
+      // if (fetchLimitByBytesEnabled) {
+      msgSize = getRecordSize(r);
+      //}
+
+      final K key = r.key();
+      final Object value =
+          valueUnwrapper == null ? r.value() : 
valueUnwrapper.unwrapValue(ssp.getSystemStream(), r.value());
+      IncomingMessageEnvelope imEnvelope =
+          new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, 
value, msgSize);
+      listMsgs.add(imEnvelope);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("# records per SSP:");
+      for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : 
results.entrySet()) {
+        List<IncomingMessageEnvelope> list = e.getValue();
+        LOG.debug(e.getKey() + " = " + ((list == null) ? 0 : list.size()));
+      }
+    }
+
+    return results;
+  }
+
+  private int getRecordSize(ConsumerRecord<K, V> r) {
+    int keySize = 0; //(r.key() == null) ? 0 : r.key().getSerializedKeySize();
+    return keySize;  // + r.getSerializedMsgSize();  // TODO -enable when 
functionality available from Kafka
+
+    //int getMessageSize (Message message) {
+    // Approximate additional shallow heap overhead per message in addition to 
the raw bytes
+    // received from Kafka  4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
+    // As this overhead is a moving target, and not very large
+    // compared to the message size its being ignore in the computation for 
now.
+    // int MESSAGE_SIZE_OVERHEAD =  4 + 64 + 4 + 4 + 4;
+
+    //      return message.size() + MESSAGE_SIZE_OVERHEAD;
+    // }
+  }
+
+  private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
+    TopicAndPartition tap = NewKafkaSystemConsumer.toTopicAndPartition(tp);
+    SystemStreamPartition ssp = 
NewKafkaSystemConsumer.toSystemStreamPartition(systemName, tap);
+    long currentSSPLag = getLatestLag(ssp); // lag between the current offset 
and the highwatermark
+    if (currentSSPLag < 0) {
+      return;
+    }
+    long recordOffset = r.offset();
+    long highWatermark = recordOffset + currentSSPLag; // derived value for 
the highwatermark
+
+    int size = getRecordSize(r);
+    kafkaConsumerMetrics.incReads(tap);
+    kafkaConsumerMetrics.incBytesReads(tap, size);
+    kafkaConsumerMetrics.setOffsets(tap, recordOffset);
+    kafkaConsumerMetrics.incBrokerBytesReads(metricName, 0, size);
+    kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
+  }
+
+  /*
+   This method put messages into blockingEnvelopeMap.
+   */
+  private void moveMessagesToTheirQueue(SystemStreamPartition ssp, 
List<IncomingMessageEnvelope> envelopes) {
+    long nextOffset = nextOffsets.get(ssp);
+
+    for (IncomingMessageEnvelope env : envelopes) {
+      sink.addMessage(ssp, env);  // move message to the BlockingEnvelopeMap's 
queue
+
+      LOG.trace("IncomingMessageEnvelope. got envelope with offset:{} for 
ssp={}", env.getOffset(), ssp);
+      nextOffset = Long.valueOf(env.getOffset()) + 1;
+    }
+
+    nextOffsets.put(ssp, nextOffset);
+  }
+
+  private void populateMetricNames(Set<SystemStreamPartition> ssps) {
+    HashMap<String, String> tags = new HashMap<>();
+    tags.put("client-id", clientId);// this is required by the KafkaConsumer 
to get the metrics
+
+    for (SystemStreamPartition ssp : ssps) {
+      TopicPartition tp = NewKafkaSystemConsumer.toTopicPartition(ssp);
+      ssp2MetricName.put(ssp, new MetricName(tp + ".records-lag", 
"consumer-fetch-manager-metrics", "", tags));
+    }
+  }
+
+  /*
+    The only way to figure out lag for the LiKafkaConsumer is to look at the 
metrics after each poll() call.
+    One of the metrics (records-lag) shows how far behind the HighWatermark 
the consumer is.
+    This method populates the lag information for each SSP into latestLags 
member variable.
+   */
+  private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
+
+    Map<MetricName, ? extends Metric> consumerMetrics = 
kafkaConsumer.metrics();
+
+    // populate the MetricNames first time
+    if (ssp2MetricName.isEmpty()) {
+      populateMetricNames(ssps);
+    }
+
+    for (SystemStreamPartition ssp : ssps) {
+      MetricName mn = ssp2MetricName.get(ssp);
+      Metric currentLagM = consumerMetrics.get(mn);
+
+      // In linkedin-kafka-client 5.*, high watermark is fixed to be the 
offset of last available message,
+      // so the lag is now at least 0, which is the same as Samza's definition.
+      // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps 
polling.
+      long currentLag = (currentLagM != null) ? (long) currentLagM.value() : 
-1L;
+      /*
+      Metric averageLagM = consumerMetrics.get(new MetricName(tp + 
".records-lag-avg", "consumer-fetch-manager-metrics", "", tags));
+      double averageLag = (averageLagM != null) ? averageLagM.value() : -1.0;
+      Metric maxLagM = consumerMetrics.get(new MetricName(tp + 
".records-lag-max", "consumer-fetch-manager-metrics", "", tags));
+      double maxLag = (maxLagM != null) ? maxLagM.value() : -1.0;
+      */
+      latestLags.put(ssp, currentLag);
+
+      // calls the setIsAtHead for the BlockingEnvelopeMap
+      sink.setIsAtHighWatermark(ssp, currentLag == 0);
+    }
+  }
+
+  /*
+    Get the latest lag for a specific SSP.
+   */
+  public long getLatestLag(SystemStreamPartition ssp) {
+    Long lag = latestLags.get(ssp);
+    if (lag == null) {
+      throw new SamzaException("Unknown/unregistered ssp in latestLags 
request: " + ssp);
+    }
+    return lag;
+  }
+
+  /*
+    Using the consumer to poll the messages from the stream.
+   */
+  private void fetchMessages() {
+    Set<SystemStreamPartition> SSPsToFetch = new HashSet<>();
+    for (SystemStreamPartition ssp : nextOffsets.keySet()) {
+      if (sink.needsMoreMessages(ssp)) {
+        SSPsToFetch.add(ssp);
+      }
+    }
+    LOG.debug("pollConsumer {}", SSPsToFetch.size());
+    if (!SSPsToFetch.isEmpty()) {
+      kafkaConsumerMetrics.incBrokerReads(metricName, 0);
+
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("pollConsumer from following SSPs: {}; total#={}", 
SSPsToFetch, SSPsToFetch.size());
+      }
+      response = pollConsumer(SSPsToFetch, 500); // TODO should be default 
value from ConsumerConfig
+
+      // move the responses into the queue
+      for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : 
response.entrySet()) {
+        List<IncomingMessageEnvelope> envelopes = e.getValue();
+        if (envelopes != null) {
+          moveMessagesToTheirQueue(e.getKey(), envelopes);
+        }
+      }
+
+      populateCurrentLags(SSPsToFetch); // find current lags for for each SSP
+    } else { // nothing to read
+
+      LOG.debug("No topic/partitions need to be fetched for consumer {} right 
now. Sleeping {}ms.", kafkaConsumer,
+          SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+
+      kafkaConsumerMetrics.incBrokerSkippedFetchRequests(metricName, 0);
+
+      try {
+        Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+      } catch (InterruptedException e) {
+        LOG.warn("Sleep in fetchMessages was interrupted");
+      }
+    }
+    refreshLatencyMetrics();
+  }
+
+  private void refreshLatencyMetrics() {
+    for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
+      SystemStreamPartition ssp = e.getKey();
+      Long offset = e.getValue();
+      TopicAndPartition tp = NewKafkaSystemConsumer.toTopicAndPartition(ssp);
+      Long lag = latestLags.get(ssp);
+      LOG.trace("Latest offset of {} is  {}; lag = {}", ssp, offset, lag);
+      if (lag != null && offset != null && lag >= 0) {
+        long streamEndOffset = offset.longValue() + lag.longValue();
+        // update the metrics
+        kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset);
+        kafkaConsumerMetrics.setLagValue(tp, lag.longValue());
+      }
+    }
+  }
+
+  boolean isRunning() {
+    return isRunning;
+  }
+
+  Throwable getFailureCause() {
+    return failureCause;
+  }
+
+  public void stop(long timeout) {
+    LOG.info("Shutting down LiKafkaConsumerProxy poll thread:" + toString());
+
+    isRunning = false;
+    try {
+      consumerPollThread.join(timeout);
+    } catch (InterruptedException e) {
+      LOG.warn("Join in LiKafkaConsumerProxy has failed", e);
+      consumerPollThread.interrupt();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 9f0b5f2..c7f6aed 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,16 +19,21 @@
 
 package org.apache.samza.system.kafka
 
+import java.util
 import java.util.Properties
+
+import kafka.consumer.ConsumerConfig
 import kafka.utils.ZkUtils
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
-import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, 
ClientUtilTopicMetadataStore}
-import org.apache.samza.config.{KafkaConfig, ApplicationConfig, StreamConfig, 
Config}
+import org.apache.samza.util._
+import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, 
StreamConfig}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
 import org.apache.samza.system.SystemProducer
@@ -53,21 +58,35 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
     // Kind of goofy to need a producer config for consumers, but we need 
metadata.
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
     val bootstrapServers = producerConfig.bootsrapServers
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)
+    //val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)
 
-    val timeout = consumerConfig.socketTimeoutMs
-    val bufferSize = consumerConfig.socketReceiveBufferBytes
-    val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, 
config.getFetchMessageMaxBytesTopics(systemName))
-    val consumerMinSize = consumerConfig.fetchMinBytes
-    val consumerMaxWait = consumerConfig.fetchWaitMaxMs
-    val autoOffsetResetDefault = consumerConfig.autoOffsetReset
+    //val kafkaConfig = new KafkaConfig(config)
+
+
+   // val timeout = consumerConfig.socketTimeoutMs
+    //val bufferSize = consumerConfig.socketReceiveBufferBytes
+    //val fetchSize = new 
StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, 
config.getFetchMessageMaxBytesTopics(systemName))
+    //val consumerMinSize = consumerConfig.fetchMinBytes
+    //val consumerMaxWait = consumerConfig.fetchWaitMaxMs
+    //val autoOffsetResetDefault = consumerConfig.autoOffsetReset
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
     val fetchThreshold = 
config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
     val fetchThresholdBytes = 
config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong
-    val offsetGetter = new GetOffset(autoOffsetResetDefault, 
autoOffsetResetTopics)
-    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, 
clientId, timeout)
+    //val offsetGetter = new GetOffset(autoOffsetResetDefault, 
autoOffsetResetTopics)
+    //val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, 
clientId, timeout)
 
-    new KafkaSystemConsumer(
+
+    val kafkaConsumer: KafkaConsumer[Array[Byte], Array[Byte]] =
+      NewKafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config)
+
+    def valueUnwrapper: NewKafkaSystemConsumer.ValueUnwrapper[Array[Byte]] = 
null;// TODO add real unrapper from
+    val kc = new NewKafkaSystemConsumer (
+      kafkaConsumer, systemName, config, clientId,
+      metrics, new SystemClock, false, valueUnwrapper)
+
+    kc
+    /*
+      new KafkaSystemConsumer(
       systemName = systemName,
       systemAdmin = getAdmin(systemName, config),
       metrics = metrics,
@@ -82,7 +101,18 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
       fetchThresholdBytes = fetchThresholdBytes,
       fetchLimitByBytesEnabled = 
config.isConsumerFetchThresholdBytesEnabled(systemName),
       offsetGetter = offsetGetter)
+      */
+  }
+
+  /*
+  def getKafkaConsumerImpl(systemName: String, config: KafkaConfig) = {
+    info("Consumer properties in getKafkaConsumerImpl: systemName: {}, 
consumerProperties: {}", systemName, config)
+
+    val byteArrayDeserializer = new ByteArrayDeserializer
+    new KafkaConsumer[Array[Byte], 
Array[Byte]](config.configForVanillaConsumer(),
+      byteArrayDeserializer, byteArrayDeserializer)
   }
+  */
 
   def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {
     val clientId = KafkaUtil.getClientId("samza-producer", config)

http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
new file mode 100644
index 0000000..26db610
--- /dev/null
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -0,0 +1,403 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap 
implements SystemConsumer{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NewKafkaSystemConsumer.class);
+
+  /**
+   * Provides a way to unwrap the value further. It is used for intermediate 
stream messages.
+   * @param <T> value type
+   */
+  public interface ValueUnwrapper<T> {
+    Object unwrapValue(SystemStream systemStream, T value);
+  }
+
+  private static final long FETCH_THRESHOLD = 50000;
+  private static final long FETCH_THRESHOLD_BYTES = -1L;
+  private final Consumer<K,V> kafkaConsumer;
+  private final String systemName;
+  private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
+  private final String clientId;
+  private final String metricName;
+  private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP 
= new HashMap<>();
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private final Config config;
+  private final boolean fetchThresholdBytesEnabled;
+  private final ValueUnwrapper<V> valueUnwrapper;
+
+  // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
+  private KafkaConsumerMessageSink messageSink;
+  // proxy is doing the actual reading
+  private KafkaConsumerProxy proxy;
+
+  /* package private */final Map<TopicPartition, String> 
topicPartitions2Offset = new HashMap<>();
+  /* package private */long perPartitionFetchThreshold;
+  /* package private */long perPartitionFetchThresholdBytes;
+
+  // TODO - consider new class for KafkaSystemConsumerMetrics
+
+  /**
+   * @param systemName
+   * @param config
+   * @param metrics
+   */
+  public NewKafkaSystemConsumer(
+      Consumer<K,V> kafkaConsumer,
+      String systemName,
+      Config config,
+      String clientId,
+      KafkaSystemConsumerMetrics metrics,
+      Clock clock,
+      boolean fetchThresholdBytesEnabled,
+      ValueUnwrapper<V> valueUnwrapper) {
+
+    super(metrics.registry(),clock, metrics.getClass().getName());
+
+    this.samzaConsumerMetrics = metrics;
+    this.clientId = clientId;
+    this.systemName = systemName;
+    this.config = config;
+    this.fetchThresholdBytesEnabled = fetchThresholdBytesEnabled;
+    this.metricName = systemName + " " + clientId;
+
+    this.kafkaConsumer = kafkaConsumer;
+    this.valueUnwrapper = valueUnwrapper;
+
+    LOG.info(String.format(
+        "Created SamzaLiKafkaSystemConsumer for system=%s, clientId=%s, 
metricName=%s with liKafkaConsumer=%s",
+        systemName, clientId, metricName, this.kafkaConsumer.toString()));
+  }
+
+  public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String 
systemName, String clientId, Config config) {
+
+    Map<String, String> injectProps = new HashMap<>();
+    injectProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    injectProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+
+    KafkaConsumerConfig consumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, 
clientId, injectProps);
+
+    LOG.info("==============>Consumer properties in getKafkaConsumerImpl: 
systemName: {}, consumerProperties: {}", systemName, 
consumerConfig.originals());
+    /*
+    Map<String, Object> kafkaConsumerConfig = 
consumerConfig.originals().entrySet().stream()
+        .collect(Collectors.toMap((kv)->kv.getKey(), 
(kv)->(Object)kv.getValue()));
+*/
+
+    return new KafkaConsumer<byte[], byte[]>(consumerConfig.originals());
+  }
+
+  /**
+   * return system name for this consumer
+   * @return system name
+   */
+  public String getSystemName() {
+    return systemName;
+  }
+
+  @Override
+  public void start() {
+    if (!started.compareAndSet(false, true)) {
+      LOG.warn("attempting to start the consumer for the second (or more) 
time.");
+      return;
+    }
+    if(stopped.get()) {
+      LOG.warn("attempting to start a stopped consumer");
+      return;
+    }
+LOG.info("==============>About to start consumer");
+    // initialize the subscriptions for all the registered TopicPartitions
+    startSubscription();
+    LOG.info("==============>subscription started");
+    // needs to be called after all the registrations are completed
+    setFetchThresholds();
+    LOG.info("==============>thresholds ste");
+    // Create the proxy to do the actual message reading. It is a separate 
thread that reads the messages from the stream
+    // and puts them into the sink.
+    createConsumerProxy();
+    LOG.info("==============>proxy  started");
+    startConsumer();
+    LOG.info("==============>consumer started");
+  }
+
+  private void startSubscription() {
+    //subscribe to all the TopicPartitions
+    LOG.info("==============>startSubscription for TP: " + 
topicPartitions2SSP.keySet());
+    try {
+      synchronized (kafkaConsumer) {
+        // we are using assign (and not subscribe), so we need to specify both 
topic and partition
+        //topicPartitions2SSP.put(new TopicPartition("FAKE PARTITION", 0), new 
SystemStreamPartition("Some","Another", new Partition(0)));
+        //topicPartitions2Offset.put(new TopicPartition("FAKE PARTITION", 0), 
"1234");
+        kafkaConsumer.assign(topicPartitions2SSP.keySet());
+      }
+    } catch (Exception e) {
+      LOG.warn("startSubscription failed.", e);
+      throw new SamzaException(e);
+    }
+  }
+
+  private void createConsumerProxy() {
+    // create a sink for passing the messages between the proxy and the 
consumer
+    messageSink = new KafkaConsumerMessageSink();
+
+    // create the thread with the consumer
+    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, 
messageSink,
+        samzaConsumerMetrics, metricName, valueUnwrapper);
+
+    LOG.info("==============>Created consumer proxy: " + proxy);
+  }
+
+  /*
+   Set the offsets to start from.
+   Add the TopicPartitions to the proxy.
+   Start the proxy thread.
+   */
+  private void startConsumer() {
+    //set the offset for each TopicPartition
+    topicPartitions2Offset.forEach((tp, startingOffsetString) -> {
+      long startingOffset = Long.valueOf(startingOffsetString);
+
+      try {
+        synchronized (kafkaConsumer) {
+          // TODO in the future we may need to add special handling here for 
BEGIN/END_OFFSET
+          // this will call liKafkaConsumer.seekToBegin/End()
+          kafkaConsumer.seek(tp, startingOffset); // this value should already 
be the 'upcoming' value
+        }
+      } catch (Exception e) {
+        // all other exceptions - non recoverable
+        LOG.error("Got Exception while seeking to " + startingOffsetString + " 
for " + tp, e);
+        throw new SamzaException(e);
+      }
+
+      LOG.info("==============>Changing Consumer's position for tp = " + tp + 
" to " + startingOffsetString);
+
+      // add the partition to the proxy
+      proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset);
+    });
+
+    // start the proxy thread
+    if (proxy != null && !proxy.isRunning()) {
+      proxy.start();
+    }
+  }
+
+  private void setFetchThresholds() {
+    // get the thresholds, and set defaults if not defined.
+    KafkaConfig kafkaConfig = new KafkaConfig(config);
+    Option<String> fetchThresholdOption = 
kafkaConfig.getConsumerFetchThreshold(systemName);
+    long fetchThreshold = FETCH_THRESHOLD;
+    if(fetchThresholdOption.isDefined()) {
+      fetchThreshold = Long.valueOf(fetchThresholdOption.get());
+      LOG.info("fetchThresholdOption is defined. fetchThreshold=" + 
fetchThreshold);
+    }
+    Option<String> fetchThresholdBytesOption = 
kafkaConfig.getConsumerFetchThresholdBytes(systemName);
+    long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
+    if(fetchThresholdBytesOption.isDefined()) {
+      fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
+      LOG.info("fetchThresholdBytesOption is defined. fetchThresholdBytes=" + 
fetchThresholdBytes);
+    }
+    LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; 
fetchThreshold=" + fetchThreshold);
+    LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; 
topicPartition2SSP #=" + topicPartitions2SSP.size());
+
+    if (topicPartitions2SSP.size() > 0) {
+      perPartitionFetchThreshold = fetchThreshold / topicPartitions2SSP.size();
+      LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold);
+      if(fetchThresholdBytesEnabled) {
+        // currently this feature cannot be enabled, because we do not have 
the size of the messages available.
+        // messages get double buffered, hence divide by 2
+        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / 
topicPartitions2SSP.size();
+        LOG.info("perPartitionFetchThresholdBytes is enabled. 
perPartitionFetchThresholdBytes=" + perPartitionFetchThresholdBytes);
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (!stopped.compareAndSet(false, true)) {
+      LOG.warn("attempting to stop stopped consumer.");
+      return;
+    }
+
+    LOG.warn("Stopping SamzaRawLiKafkaConsumer + " + this);
+    // stop the proxy (with 5 minutes timeout)
+    if(proxy != null)
+      proxy.stop(TimeUnit.MINUTES.toMillis(5));
+
+    try {
+      synchronized (kafkaConsumer) {
+        kafkaConsumer.close();
+      }
+    } catch (Exception e) {
+      LOG.warn("failed to stop SamzaRawLiKafkaConsumer + " + this, e);
+    }
+  }
+
+  /*
+   record the ssp and the offset. Do not submit it to the consumer yet.
+   */
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
+    if (!systemStreamPartition.getSystem().equals(systemName)) {
+      LOG.warn("ignoring SSP " + systemStreamPartition + ", because this 
consumer's system is " + systemName);
+      return;
+    }
+    super.register(systemStreamPartition, offset);
+
+    TopicPartition tp = toTopicPartition(systemStreamPartition);
+
+    topicPartitions2SSP.put(tp, systemStreamPartition);
+
+    LOG.info("==============>registering ssp = " + systemStreamPartition + " 
with offset " + offset);
+
+    String existingOffset = topicPartitions2Offset.get(tp);
+    // register the older (of the two) offset in the consumer, to guarantee we 
do not miss any messages.
+    if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
+      topicPartitions2Offset.put(tp, offset);
+    }
+
+    samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp));
+  }
+
+  /**
+   * Compare two String offsets.
+   * Note. There is a method in KafkaAdmin that does that, but that would 
require instantiation of systemadmin for each consumer.
+   * @param off1
+   * @param off2
+   * @return see {@link Long#compareTo(Long)}
+   */
+  public static int compareOffsets(String off1, String off2) {
+    return Long.valueOf(off1).compareTo(Long.valueOf(off2));
+  }
+
+  @Override
+  public String toString() {
+    return systemName + " " + clientId + "/" + super.toString();
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+      throws InterruptedException {
+
+    // check if the proxy is running
+    if(!proxy.isRunning()) {
+      stop();
+      if (proxy.getFailureCause() != null) {
+        String message = "LiKafkaConsumerProxy has stopped";
+        if(proxy.getFailureCause() instanceof 
org.apache.kafka.common.errors.TopicAuthorizationException)
+          message += " due to TopicAuthorizationException Please refer to 
go/samzaacluserguide to correctly set up acls for your topic";
+        throw new SamzaException(message, proxy.getFailureCause());
+      } else {
+        LOG.warn("Failure cause not populated for LiKafkaConsumerProxy");
+        throw new SamzaException("LiKafkaConsumerProxy has stopped");
+      }
+    }
+
+    return super.poll(systemStreamPartitions, timeout);
+  }
+
+  public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
+    return new TopicAndPartition(tp.topic(), tp.partition());
+  }
+
+  public static TopicAndPartition toTopicAndPartition(SystemStreamPartition 
ssp) {
+    return new TopicAndPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId());
+  }
+
+  public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+    return new TopicPartition(ssp.getStream(), 
ssp.getPartition().getPartitionId());
+  }
+
+  public static SystemStreamPartition toSystemStreamPartition(String 
systemName, TopicAndPartition tp) {
+    return new SystemStreamPartition(systemName, tp.topic(), new 
Partition(tp.partition()));
+  }
+
+  ////////////////////////////////////
+  // inner class for the message sink
+  ////////////////////////////////////
+  public class KafkaConsumerMessageSink {
+
+    public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean 
isAtHighWatermark) {
+      setIsAtHead(ssp, isAtHighWatermark);
+    }
+
+    boolean needsMoreMessages(SystemStreamPartition ssp) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes 
enabled={}; messagesSizeInQueue={};"
+                + "(limit={}); messagesNumInQueue={}(limit={};", ssp, 
fetchThresholdBytesEnabled, getMessagesSizeInQueue(ssp), 
perPartitionFetchThresholdBytes,
+            getNumMessagesInQueue(ssp), perPartitionFetchThreshold);
+      }
+
+      if (fetchThresholdBytesEnabled) {
+        return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; 
// TODO Validate
+      } else {
+        return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold;
+      }
+    }
+
+    void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope 
envelope) {
+      LOG.info("==============>Incoming message ssp = {}: envelope = {}.", 
ssp, envelope);
+
+      try {
+        put(ssp, envelope);
+      } catch (InterruptedException e) {
+        throw new SamzaException(
+            String.format("Interrupted while trying to add message with offset 
%s for ssp %s",
+                envelope.getOffset(),
+                ssp));
+      }
+    }
+  }  // end of KafkaMessageSink class
+  ///////////////////////////////////////////////////////////////////////////
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 065170c..8544dbf 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -88,12 +88,12 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     zkClient.close
 
     // read before topic exists should result in a null checkpoint
-    //val readCp = readCheckpoint(checkpointTopic, taskName)
-    //assertNull(readCp)
+    val readCp = readCheckpoint(checkpointTopic, taskName)
+    assertNull(readCp)
 
     writeCheckpoint(checkpointTopic, taskName, checkpoint1)
+
     assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
-try {Thread.sleep(20000)} catch { case e:Exception =>() }
     // writing a second message and reading it returns a more recent checkpoint
     writeCheckpoint(checkpointTopic, taskName, checkpoint2)
     assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
@@ -194,7 +194,7 @@ try {Thread.sleep(20000)} catch { case e:Exception =>() }
     val systemFactory = Util.getObj(systemFactoryClassName, 
classOf[SystemFactory])
 
     val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, 
props)
-    System.out.println("CONFIG:" + config)
+    System.out.println("CONFIG = " + config)
     new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, 
config, new NoOpMetricsRegistry, serde)
   }
 

Reply via email to