Replaced KafkaSystemConsumer, based on SimpleConsumer, with 
NewKafkaSystemConsumer, based on high level Kafka consumer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/332a0481
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/332a0481
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/332a0481

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 332a04815bbc5d526b736d82e5f05262b0922d57
Parents: bab5bdd
Author: Boris S <[email protected]>
Authored: Wed Sep 5 11:51:58 2018 -0700
Committer: Boris S <[email protected]>
Committed: Wed Sep 5 11:51:58 2018 -0700

----------------------------------------------------------------------
 .../samza/system/IncomingMessageEnvelope.java   |   3 +-
 .../ClusterBasedJobCoordinator.java             |   2 +-
 .../stream/CoordinatorStreamSystemConsumer.java |   4 +-
 .../apache/samza/storage/StorageRecovery.java   |   2 +-
 .../samza/checkpoint/CheckpointTool.scala       |   2 +-
 .../apache/samza/checkpoint/OffsetManager.scala |   4 +-
 .../samza/coordinator/JobModelManager.scala     |   5 +-
 .../samza/job/local/ProcessJobFactory.scala     |   3 +-
 .../samza/job/local/ThreadJobFactory.scala      |  14 +-
 .../samza/coordinator/TestJobCoordinator.scala  |   4 +-
 .../clients/consumer/KafkaConsumerConfig.java   |  81 ++--
 .../samza/system/kafka/KafkaConsumerProxy.java  |  32 +-
 .../kafka/KafkaSystemConsumerMetrics.scala      |  69 ++-
 .../samza/system/kafka/KafkaSystemFactory.scala |  47 +-
 .../system/kafka/NewKafkaSystemConsumer.java    |  93 ++--
 .../samza/system/kafka/TestBrokerProxy.scala    | 437 -------------------
 .../test/integration/StreamTaskTestUtil.scala   |   8 +-
 17 files changed, 170 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 4d0ce2f..c5aed31 100644
--- 
a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ 
b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -59,7 +59,8 @@ public class IncomingMessageEnvelope {
    * @param message A deserialized message received from the partition offset.
    * @param size size of the message and key in bytes.
    */
-  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset, Object key, Object message, int size) {
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, 
String offset,
+      Object key, Object message, int size) {
     this.systemStreamPartition = systemStreamPartition;
     this.offset = offset;
     this.key = key;

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 016d171..12e26f7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -174,7 +174,7 @@ public class ClusterBasedJobCoordinator {
 
     // build a JobModelManager and ChangelogStreamManager and perform 
partition assignments.
     changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager);
-    jobModelManager = JobModelManager.apply(coordinatorStreamManager, 
changelogStreamManager.readPartitionMapping());
+    jobModelManager = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping());
 
     config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 0bdb874..38255a2 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -176,7 +176,7 @@ public class CoordinatorStreamSystemConsumer {
             valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
           }
           CoordinatorStreamMessage coordinatorStreamMessage = new 
CoordinatorStreamMessage(keyArray, valueMap);
-          log.info("Received coordinator stream message: {}", 
coordinatorStreamMessage);
+          log.debug("Received coordinator stream message: {}", 
coordinatorStreamMessage);
           // Remove any existing entry. Set.add() does not add if the element 
already exists.
           if (bootstrappedMessages.remove(coordinatorStreamMessage)) {
             log.debug("Removed duplicate message: {}", 
coordinatorStreamMessage);
@@ -194,7 +194,7 @@ public class CoordinatorStreamSystemConsumer {
         }
 
         bootstrappedStreamSet = 
Collections.unmodifiableSet(bootstrappedMessages);
-        log.info("Bootstrapped configuration: {}", configMap);
+        log.debug("Bootstrapped configuration: {}", configMap);
         isBootstrapped = true;
       } catch (Exception e) {
         throw new SamzaException(e);

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index f9c6c0c..c6dd9a7 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -131,7 +131,7 @@ public class StorageRecovery extends CommandLine {
     coordinatorStreamManager.start();
     coordinatorStreamManager.bootstrap();
     ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager);
-    JobModel jobModel = JobModelManager.apply(coordinatorStreamManager, 
changelogStreamManager.readPartitionMapping()).jobModel();
+    JobModel jobModel = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping()).jobModel();
     containers = jobModel.getContainers();
     coordinatorStreamManager.stop();
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 0ca8a3d..65fb419 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -170,7 +170,7 @@ class CheckpointTool(config: Config, newOffsets: 
TaskNameToCheckpointMap, manage
     coordinatorStreamManager.start
     coordinatorStreamManager.bootstrap
     val changelogManager = new ChangelogStreamManager(coordinatorStreamManager)
-    val jobModelManager = JobModelManager(coordinatorStreamManager, 
changelogManager.readPartitionMapping())
+    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, 
changelogManager.readPartitionMapping())
     val taskNames = jobModelManager
       .jobModel
       .getContainers

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index d2b6667..53d5e98 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -304,7 +304,7 @@ class OffsetManager(
    */
   private def loadOffsetsFromCheckpointManager {
     if (checkpointManager != null) {
-      info("Loading offsets from checkpoint manager.")
+      debug("Loading offsets from checkpoint manager.")
 
       checkpointManager.start
       val result = systemStreamPartitions
@@ -332,7 +332,7 @@ class OffsetManager(
    * Loads last processed offsets for a single taskName.
    */
   private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[TaskName, 
Map[SystemStreamPartition, String]] = {
-    info("Loading checkpoints for taskName: %s." format taskName)
+    debug("Loading checkpoints for taskName: %s." format taskName)
 
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index f939736..f7ffd4e 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -64,12 +64,11 @@ object JobModelManager extends Logging {
    * a) Reads the jobModel from coordinator stream using the job's 
configuration.
    * b) Recomputes changelog partition mapping based on jobModel and job's 
configuration.
    * c) Builds JobModelManager using the jobModel read from coordinator stream.
-   * @param coordinatorStreamManager Coordinator stream manager.
+   * @param config Coordinator stream manager config
    * @param changelogPartitionMapping The changelog partition-to-task mapping.
    * @return JobModelManager
    */
-  def apply(coordinatorStreamManager: CoordinatorStreamManager, 
changelogPartitionMapping: util.Map[TaskName, Integer]) = {
-    val config = coordinatorStreamManager.getConfig
+  def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, 
Integer]) = {
     val localityManager = new LocalityManager(config, new MetricsRegistryMap())
 
     // Map the name of each system to the corresponding SystemAdmin

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 642a484..64f516b 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -50,7 +50,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging 
{
     coordinatorStreamManager.bootstrap
     val changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager)
 
-    val coordinator = JobModelManager(coordinatorStreamManager, 
changelogStreamManager.readPartitionMapping())
+    val coordinator = JobModelManager(coordinatorStreamManager.getConfig, 
changelogStreamManager.readPartitionMapping())
     val jobModel = coordinator.jobModel
 
     val taskPartitionMappings: util.Map[TaskName, Integer] = new 
util.HashMap[TaskName, Integer]
@@ -61,6 +61,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging 
{
     }
 
     changelogStreamManager.writePartitionMapping(taskPartitionMappings)
+    coordinatorStreamManager.stop()
 
     //create necessary checkpoint and changelog streams
     val checkpointManager = new 
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 34cc2a0..15aa5a6 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,11 +19,9 @@
 
 package org.apache.samza.job.local
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-
-import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
+import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, 
TaskName}
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
@@ -38,8 +36,8 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 /**
- * Creates a new Thread job with the given config
- */
+  * Creates a new Thread job with the given config
+  */
 class ThreadJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
@@ -51,7 +49,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     coordinatorStreamManager.bootstrap
     val changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager)
 
-    val coordinator = JobModelManager(coordinatorStreamManager, 
changelogStreamManager.readPartitionMapping())
+    val coordinator = JobModelManager(coordinatorStreamManager.getConfig, 
changelogStreamManager.readPartitionMapping())
+    coordinatorStreamManager.stop()
     val jobModel = coordinator.jobModel
 
     val taskPartitionMappings: mutable.Map[TaskName, Integer] = 
mutable.Map[TaskName, Integer]()
@@ -85,7 +84,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
 
     // Give developers a nice friendly warning if they've specified task.opts 
and are using a threaded job.
     config.getTaskOpts match {
-      case Some(taskOpts) => warn("%s was specified in config, but is not 
being used because job is being executed with ThreadJob. You probably want to 
run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, 
classOf[ProcessJobFactory].getName))
+      case Some(taskOpts) => warn("%s was specified in config, but is not 
being used because job is being executed with ThreadJob. You probably want to 
run %s=%s." format(TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, 
classOf[ProcessJobFactory].getName))
       case _ => None
     }
 
@@ -117,7 +116,6 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
       threadJob
     } finally {
       coordinator.stop
-      coordinatorStreamManager.stop
       jmxServer.stop
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 42610ae..b85b4a4 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -275,7 +275,9 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
     coordinatorStreamManager.start
     coordinatorStreamManager.bootstrap
     val changelogPartitionManager = new 
ChangelogStreamManager(coordinatorStreamManager)
-    JobModelManager(coordinatorStreamManager, 
changelogPartitionManager.readPartitionMapping())
+    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, 
changelogPartitionManager.readPartitionMapping())
+    coordinatorStreamManager.stop()
+    jobModelManager
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
index 88437ee..843e03d 100644
--- 
a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ 
b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -43,11 +43,13 @@ public class KafkaConsumerConfig extends ConsumerConfig {
 
   private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
   private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
+  private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
   private static final String SAMZA_OFFSET_LARGEST = "largest";
   private static final String SAMZA_OFFSET_SMALLEST = "smallest";
   private static final String KAFKA_OFFSET_LATEST = "latest";
   private static final String KAFKA_OFFSET_EARLIEST = "earliest";
   private static final String KAFKA_OFFSET_NONE = "none";
+
   /*
    * By default, KafkaConsumer will fetch ALL available messages for all the 
partitions.
    * This may cause memory issues. That's why we will limit the number of 
messages per partition we get on EACH poll().
@@ -59,8 +61,8 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     super(props);
   }
 
-  public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config,
-      String systemName, String clientId, Map<String, String> injectProps) {
+  public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config 
config, String systemName, String clientId,
+      Map<String, String> injectProps) {
 
     Config subConf = config.subset(String.format("systems.%s.consumer.", 
systemName), true);
 
@@ -72,17 +74,20 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
 
-    //Open-source Kafka Consumer configuration
-    consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false"); // Disable consumer auto-commit
+    //Kafka client configuration
+
+    // Disable consumer auto-commit because Samza controls commits
+    consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 
-    consumerProps.setProperty(
-        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        getAutoOffsetResetValue(consumerProps));  // Translate samza config 
value to kafka config value
+    // Translate samza config value to kafka config value
+    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        getAutoOffsetResetValue(consumerProps));
 
     // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT?
-    if (! subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+    if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
       // get it from the producer config
-      String bootstrapServer = 
config.get(String.format("systems.%s.producer.%s", systemName, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+      String bootstrapServer =
+          config.get(String.format("systems.%s.producer.%s", systemName, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
       if (StringUtils.isEmpty(bootstrapServer)) {
         throw new SamzaException("Missing " + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
       }
@@ -90,25 +95,22 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     }
 
     // Always use default partition assignment strategy. Do not allow override.
-    consumerProps.setProperty(
-        ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-        RangeAssignor.class.getName());
-
+    
consumerProps.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RangeAssignor.class.getName());
 
     // the consumer is fully typed, and deserialization can be too. But in 
case it is not provided we should
     // default to byte[]
-    if ( !config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
-      LOG.info("default key serialization for the consumer(for {}) to 
ByteArrayDeserializer", systemName);
+    if (!config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("setting default key serialization for the consumer(for {}) to 
ByteArrayDeserializer", systemName);
       consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
     }
-    if ( !config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
-      LOG.info("default value serialization for the consumer(for {}) to 
ByteArrayDeserializer", systemName);
+    if (!config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("setting default value serialization for the consumer(for {}) 
to ByteArrayDeserializer", systemName);
       consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
     }
 
-
     // NOT SURE THIS IS NEEDED TODO
-    String maxPollRecords = 
subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT);;
+    String maxPollRecords =
+        subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT);
     consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords);
 
     // put overrides
@@ -122,38 +124,37 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     JobConfig jobConfig = new JobConfig(config);
     Option<String> jobIdOption = jobConfig.getJobId();
     Option<String> jobNameOption = jobConfig.getName();
-    return (jobNameOption.isDefined()? jobNameOption.get() : 
"undefined_job_name") + "-"
-        + (jobIdOption.isDefined()? jobIdOption.get() : "undefined_job_id");
+    return (jobNameOption.isDefined() ? jobNameOption.get() : 
"undefined_job_name") + "-" + (jobIdOption.isDefined()
+        ? jobIdOption.get() : "undefined_job_id");
   }
+
   // client id should be unique per job
-  public static String getClientId(String id, Config config) {
+  public static String getClientId(Config config) {
+    return getClientId(CONSUMER_CLIENT_ID_PREFIX, config);
+  }
+  public static String getProducerClientId(Config config) {
+    return getClientId(PRODUCER_CLIENT_ID_PREFIX, config);
+  }
+  public static String getAdminClientId(Config config) {
+    return getClientId(ADMIN_CLIENT_ID_PREFIX, config);
+  }
+
+  private static String getClientId(String id, Config config) {
     if (config.get(JobConfig.JOB_NAME()) == null) {
       throw new ConfigException("Missing job name");
     }
     String jobName = config.get(JobConfig.JOB_NAME());
-    String jobId = "1";
-    if (config.get(JobConfig.JOB_ID()) != null) {
-      jobId = config.get(JobConfig.JOB_ID());
-    }
-    return getClientId(id, jobName, jobId);
-  }
+    String jobId = (config.get(JobConfig.JOB_ID()) != null) ? 
config.get(JobConfig.JOB_ID()) : "1";
 
-  private static String getClientId(String id, String jobName, String jobId) {
-    return String.format(
-        "%s-%s-%s",
-        id.replaceAll("[^A-Za-z0-9]", "_"),
-        jobName.replaceAll("[^A-Za-z0-9]", "_"),
+    return String.format("%s-%s-%s", id.replaceAll("[^A-Za-z0-9]", "_"), 
jobName.replaceAll("[^A-Za-z0-9]", "_"),
         jobId.replaceAll("[^A-Za-z0-9]", "_"));
   }
 
-  public static String getProducerClientId(Config config) {
-    return getClientId(PRODUCER_CLIENT_ID_PREFIX, config);
-  }
-
   /**
    * Settings for auto.reset in samza are different from settings in Kafka 
(auto.offset.reset) - need to convert
    * "largest" -> "latest"
    * "smallest" -> "earliest"
+   * "none" -> "none"
    * "none" - will fail the kafka consumer, if offset is out of range
    * @param properties All consumer related {@link Properties} parsed from 
samza config
    * @return String representing the config value for "auto.offset.reset" 
property
@@ -162,9 +163,8 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     String autoOffsetReset = 
properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
KAFKA_OFFSET_LATEST);
 
     // accept kafka values directly
-    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) ||
-        autoOffsetReset.equals(KAFKA_OFFSET_LATEST) ||
-        autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || 
autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
+        || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
       return autoOffsetReset;
     }
 
@@ -177,5 +177,4 @@ public class KafkaConsumerConfig extends ConsumerConfig {
         return KAFKA_OFFSET_LATEST;
     }
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index cddfdfd..a6272cd 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -86,7 +86,7 @@ public class KafkaConsumerProxy<K, V> {
     this.clientId = clientId;
 
     // TODO - see if we need new metrics (not host:port based)
-    this.kafkaConsumerMetrics.registerBrokerProxy(metricName, 0);
+    this.kafkaConsumerMetrics.registerClientProxy(metricName);
 
     consumerPollThread = new Thread(createProxyThreadRunnable());
   }
@@ -132,7 +132,7 @@ public class KafkaConsumerProxy<K, V> {
 
     // we reuse existing metrics. They assume host and port for the broker
     // for now fake the port with the consumer name
-    kafkaConsumerMetrics.setTopicPartitionValue(metricName, 0, 
nextOffsets.size());
+    kafkaConsumerMetrics.setTopicPartitionValue(metricName, 
nextOffsets.size());
   }
 
   /**
@@ -258,16 +258,10 @@ public class KafkaConsumerProxy<K, V> {
         results.put(ssp, listMsgs);
       }
 
-      // TODO - add calculation of the size of the message, when available 
from Kafka
-      int msgSize = 0;
-      // if (fetchLimitByBytesEnabled) {
-      msgSize = getRecordSize(r);
-      //}
-
       final K key = r.key();
       final Object value = r.value();
       IncomingMessageEnvelope imEnvelope =
-          new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, 
value, msgSize);
+          new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, 
value, getRecordSize(r));
       listMsgs.add(imEnvelope);
     }
     if (LOG.isDebugEnabled()) {
@@ -282,18 +276,8 @@ public class KafkaConsumerProxy<K, V> {
   }
 
   private int getRecordSize(ConsumerRecord<K, V> r) {
-    int keySize = 0; //(r.key() == null) ? 0 : r.key().getSerializedKeySize();
-    return keySize;  // + r.getSerializedMsgSize();  // TODO -enable when 
functionality available from Kafka
-
-    //int getMessageSize (Message message) {
-    // Approximate additional shallow heap overhead per message in addition to 
the raw bytes
-    // received from Kafka  4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
-    // As this overhead is a moving target, and not very large
-    // compared to the message size its being ignore in the computation for 
now.
-    // int MESSAGE_SIZE_OVERHEAD =  4 + 64 + 4 + 4 + 4;
-
-    //      return message.size() + MESSAGE_SIZE_OVERHEAD;
-    // }
+    int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
+    return keySize + r.serializedValueSize();
   }
 
   private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
@@ -310,7 +294,7 @@ public class KafkaConsumerProxy<K, V> {
     kafkaConsumerMetrics.incReads(tap);
     kafkaConsumerMetrics.incBytesReads(tap, size);
     kafkaConsumerMetrics.setOffsets(tap, recordOffset);
-    kafkaConsumerMetrics.incBrokerBytesReads(metricName, 0, size);
+    kafkaConsumerMetrics.incClientBytesReads(metricName, size);
     kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
   }
 
@@ -398,7 +382,7 @@ public class KafkaConsumerProxy<K, V> {
     }
     LOG.debug("pollConsumer {}", SSPsToFetch.size());
     if (!SSPsToFetch.isEmpty()) {
-      kafkaConsumerMetrics.incBrokerReads(metricName, 0);
+      kafkaConsumerMetrics.incClientReads(metricName);
 
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
       if (LOG.isDebugEnabled()) {
@@ -420,7 +404,7 @@ public class KafkaConsumerProxy<K, V> {
       LOG.debug("No topic/partitions need to be fetched for consumer {} right 
now. Sleeping {}ms.", kafkaConsumer,
           SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
 
-      kafkaConsumerMetrics.incBrokerSkippedFetchRequests(metricName, 0);
+      kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName);
 
       try {
         Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 1aa66dc..415bd38 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -19,13 +19,10 @@
 
 package org.apache.samza.system.kafka
 
-import org.apache.samza.metrics.MetricsHelper
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.MetricsRegistry
 import java.util.concurrent.ConcurrentHashMap
+
 import kafka.common.TopicAndPartition
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics._
 
 class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val 
registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
   val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
@@ -34,68 +31,66 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
   val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
   val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
 
-  /*
-  TODO Fix
-   * (String, Int) = (host, port) of BrokerProxy.
-   */
-
-  val reconnects = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerSkippedFetchRequests = new ConcurrentHashMap[(String, Int), 
Counter]
-  val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
+  val clientBytesRead = new ConcurrentHashMap[String, Counter]
+  val clientReads = new ConcurrentHashMap[String, Counter]
+  val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter]
+  val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]]
 
   def registerTopicAndPartition(tp: TopicAndPartition) = {
     if (!offsets.contains(tp)) {
-      offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, 
tp.partition)))
-      bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, 
tp.partition)))
-      reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, 
tp.partition)))
-      highWatermark.put(tp, newGauge("%s-%s-high-watermark" format (tp.topic, 
tp.partition), -1L))
-      lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format 
(tp.topic, tp.partition), 0L))
+      offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, 
tp.partition)))
+      bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, 
tp.partition)))
+      reads.put(tp, newCounter("%s-%s-messages-read" format(tp.topic, 
tp.partition)))
+      highWatermark.put(tp, newGauge("%s-%s-high-watermark" format(tp.topic, 
tp.partition), -1L))
+      lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" 
format(tp.topic, tp.partition), 0L))
     }
   }
 
-  def registerBrokerProxy(host: String, port: Int) {
-    reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, 
port)))
-    brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format 
(host, port)))
-    brokerReads.put((host, port), newCounter("%s-%s-messages-read" format 
(host, port)))
-    brokerSkippedFetchRequests.put((host, port), 
newCounter("%s-%s-skipped-fetch-requests" format (host, port)))
-    topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format 
(host, port), 0))
+  def registerClientProxy(clientName: String) {
+    clientBytesRead.put(clientName, newCounter("%s-%s-bytes-read" format 
clientName))
+    clientReads.put((clientName), newCounter("%s-%s-messages-read" format 
clientName))
+    clientSkippedFetchRequests.put((clientName), 
newCounter("%s-%s-skipped-fetch-requests" format clientName))
+    topicPartitions.put(clientName, newGauge("%s-%s-topic-partitions" format 
clientName, 0))
   }
 
   // java friendlier interfaces
   // Gauges
-  def setTopicPartitionValue(host: String, port: Int, value: Int) {
-    topicPartitions.get((host,port)).set(value)
+  def setTopicPartitionValue(clientName: String, value: Int) {
+    topicPartitions.get(clientName).set(value)
   }
+
   def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
     lag.get((topicAndPartition)).set(value);
   }
+
   def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) 
{
     highWatermark.get((topicAndPartition)).set(value);
   }
 
   // Counters
-  def incBrokerReads(host: String, port: Int) {
-    brokerReads.get((host,port)).inc
+  def incClientReads(clientName: String) {
+    clientReads.get(clientName).inc
   }
+
   def incReads(topicAndPartition: TopicAndPartition) {
     reads.get(topicAndPartition).inc;
   }
+
   def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
     bytesRead.get(topicAndPartition).inc(inc);
   }
-  def incBrokerBytesReads(host: String, port: Int, incBytes: Long) {
-    brokerBytesRead.get((host,port)).inc(incBytes)
+
+  def incClientBytesReads(clientName: String, incBytes: Long) {
+    clientBytesRead.get(clientName).inc(incBytes)
   }
-  def incBrokerSkippedFetchRequests(host: String, port: Int) {
-    brokerSkippedFetchRequests.get((host,port)).inc()
+
+  def incClientSkippedFetchRequests(clientName: String) {
+    clientSkippedFetchRequests.get(clientName).inc()
   }
+
   def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
     offsets.get(topicAndPartition).set(offset)
   }
-  def incReconnects(host: String, port: Int) {
-    reconnects.get((host,port)).inc()
-  }
+
   override def getPrefix = systemName + "-"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 6a5eda9..892d400 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,27 +19,21 @@
 
 package org.apache.samza.system.kafka
 
-import java.util
 import java.util.Properties
 
-import kafka.consumer.ConsumerConfig
 import kafka.utils.ZkUtils
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.KafkaConsumerConfig
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
-import org.apache.samza.util._
-import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, 
StreamConfig}
-import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
-import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemAdmin
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.system.SystemConsumer
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, 
StreamConfig}
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, 
SystemProducer}
+import org.apache.samza.util._
 
 object KafkaSystemFactory extends Logging {
   def getInjectedProducerProperties(systemName: String, config: Config) = if 
(config.isChangelogSystem(systemName)) {
@@ -51,8 +45,9 @@ object KafkaSystemFactory extends Logging {
 }
 
 class KafkaSystemFactory extends SystemFactory with Logging {
+
   def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemConsumer = {
-    val clientId = KafkaUtil.getClientId("samza-consumer", config)
+    val clientId = KafkaConsumerConfig.getClientId( config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
     NewKafkaSystemConsumer.getNewKafkaSystemConsumer(
@@ -60,10 +55,12 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
   }
 
   def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {
-    val clientId = KafkaUtil.getClientId("samza-producer", config)
+    val clientId = KafkaConsumerConfig.getProducerClientId(config)
     val injectedProps = 
KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId, injectedProps)
-    val getProducer = () => { new KafkaProducer[Array[Byte], 
Array[Byte]](producerConfig.getProducerProperties) }
+    val getProducer = () => {
+      new KafkaProducer[Array[Byte], 
Array[Byte]](producerConfig.getProducerProperties)
+    }
     val metrics = new KafkaSystemProducerMetrics(systemName, registry)
 
     // Unlike consumer, no need to use encoders here, since they come for free
@@ -79,7 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
-    val clientId = KafkaUtil.getClientId("samza-admin", config)
+    val clientId = KafkaConsumerConfig.getClientId(config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
     val bootstrapServers = producerConfig.bootsrapServers
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, 
clientId)
@@ -94,13 +91,13 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
     val coordinatorStreamReplicationFactor = 
config.getCoordinatorReplicationFactor.toInt
     val storeToChangelog = config.getKafkaChangelogEnabledStores()
     // Construct the meta information for each topic, if the replication 
factor is not defined, we use 2 as the number of replicas for the change log 
stream.
-    val topicMetaInformation = storeToChangelog.map{case (storeName, 
topicName) =>
-    {
-       val replicationFactor = 
config.getChangelogStreamReplicationFactor(storeName).toInt
-       val changelogInfo = ChangelogInfo(replicationFactor, 
config.getChangelogKafkaProperties(storeName))
-       info("Creating topic meta information for topic: %s with replication 
factor: %s" format (topicName, replicationFactor))
-       (topicName, changelogInfo)
-    }}
+    val topicMetaInformation = storeToChangelog.map { case (storeName, 
topicName) => {
+      val replicationFactor = 
config.getChangelogStreamReplicationFactor(storeName).toInt
+      val changelogInfo = ChangelogInfo(replicationFactor, 
config.getChangelogKafkaProperties(storeName))
+      info("Creating topic meta information for topic: %s with replication 
factor: %s" format(topicName, replicationFactor))
+      (topicName, changelogInfo)
+    }
+    }
 
     val deleteCommittedMessages = 
config.deleteCommittedMessages(systemName).exists(isEnabled => 
isEnabled.toBoolean)
     val intermediateStreamProperties: Map[String, Properties] = 
getIntermediateStreamProperties(config)
@@ -125,7 +122,7 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
       "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, 
v); props }
   }
 
-  def getIntermediateStreamProperties(config : Config): Map[String, 
Properties] = {
+  def getIntermediateStreamProperties(config: Config): Map[String, Properties] 
= {
     val appConfig = new ApplicationConfig(config)
     if (appConfig.getAppMode == ApplicationMode.BATCH) {
       val streamConfig = new StreamConfig(config)

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
index b33db42..717b45d 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -53,12 +53,12 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
 
   private static final long FETCH_THRESHOLD = 50000;
   private static final long FETCH_THRESHOLD_BYTES = -1L;
+
   private final Consumer<K, V> kafkaConsumer;
   private final String systemName;
   private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
   private final String clientId;
   private final String metricName;
-  /* package private */final Map<TopicPartition, SystemStreamPartition> 
topicPartitions2SSP = new HashMap<>();
   private final AtomicBoolean stopped = new AtomicBoolean(false);
   private final AtomicBoolean started = new AtomicBoolean(false);
   private final Config config;
@@ -66,15 +66,16 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
 
   // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
   /* package private */ KafkaConsumerMessageSink messageSink;
+
   // proxy is doing the actual reading
   private KafkaConsumerProxy proxy;
 
   /* package private */final Map<TopicPartition, String> 
topicPartitions2Offset = new HashMap<>();
+  /* package private */final Map<TopicPartition, SystemStreamPartition> 
topicPartitions2SSP = new HashMap<>();
+
   /* package private */ long perPartitionFetchThreshold;
   /* package private */ long perPartitionFetchThresholdBytes;
 
-  // TODO - consider new class for KafkaSystemConsumerMetrics
-
   /**
    * @param systemName
    * @param config
@@ -85,32 +86,28 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
 
     super(metrics.registry(), clock, metrics.getClass().getName());
 
+    this.kafkaConsumer = kafkaConsumer;
     this.samzaConsumerMetrics = metrics;
     this.clientId = clientId;
     this.systemName = systemName;
     this.config = config;
     this.metricName = systemName + " " + clientId;
 
-    this.kafkaConsumer = kafkaConsumer;
-
     this.fetchThresholdBytesEnabled = new 
KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
 
-    LOG.info(String.format(
-        "Created SamzaKafkaSystemConsumer for system=%s, clientId=%s, 
metricName=%s with KafkaConsumer=%s", systemName,
-        clientId, metricName, this.kafkaConsumer.toString()));
+    LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, 
metricName={}, KafkaConsumer={}", systemName,
+        clientId, metricName, this.kafkaConsumer.toString());
   }
 
   public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String 
systemName, Config config,
       String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) {
 
-
-
     // extract consumer configs and create kafka consumer
     KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, 
clientId, config);
-
+    LOG.info("Created kafka consumer for system {}, clientId {}: {}", 
systemName, clientId, kafkaConsumer);
 
     NewKafkaSystemConsumer kc = new NewKafkaSystemConsumer(kafkaConsumer, 
systemName, config, clientId, metrics, clock);
-    System.out.println("kc=" + kc + "!!!!!!!!!!!!!!!!!GETTING FOR NKC for " + 
systemName);
+    LOG.info("Created samza system consumer {}", kc.toString());
 
     return kc;
   }
@@ -126,12 +123,11 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
 
     Map<String, String> injectProps = new HashMap<>();
 
-    // extract kafka consumer configs
+    // extract kafka client configs
     KafkaConsumerConfig consumerConfig =
         KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, 
clientId, injectProps);
 
-    LOG.info("==============>Consumer properties in getKafkaConsumerImpl: 
systemName: {}, consumerProperties: {}",
-        systemName, consumerConfig.originals());
+    LOG.info("KafkaClient properties for systemName {}: {}", systemName, 
consumerConfig.originals());
 
     return new KafkaConsumer<>(consumerConfig.originals());
   }
@@ -146,29 +142,23 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
       LOG.warn("attempting to start a stopped consumer");
       return;
     }
-    LOG.info("==============>About to start consumer");
     // initialize the subscriptions for all the registered TopicPartitions
     startSubscription();
-    LOG.info("==============>subscription started");
     // needs to be called after all the registrations are completed
     setFetchThresholds();
-    LOG.info("==============>thresholds ste");
     // Create the proxy to do the actual message reading. It is a separate 
thread that reads the messages from the stream
     // and puts them into the sink.
     createConsumerProxy();
-    LOG.info("==============>proxy  started");
     startConsumer();
-    LOG.info("==============>consumer started");
+    LOG.info("consumer {} started", this);
   }
 
   private void startSubscription() {
-    //subscribe to all the TopicPartitions
-    LOG.info("==============>startSubscription for TP: " + 
topicPartitions2SSP.keySet());
+    //subscribe to all the registered TopicPartitions
+    LOG.info("consumer {}, subscribes to {} ", this, 
topicPartitions2SSP.keySet());
     try {
       synchronized (kafkaConsumer) {
         // we are using assign (and not subscribe), so we need to specify both 
topic and partition
-        //topicPartitions2SSP.put(new TopicPartition("FAKE PARTITION", 0), new 
SystemStreamPartition("Some","Another", new Partition(0)));
-        //topicPartitions2Offset.put(new TopicPartition("FAKE PARTITION", 0), 
"1234");
         kafkaConsumer.assign(topicPartitions2SSP.keySet());
       }
     } catch (Exception e) {
@@ -184,7 +174,7 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
     // create the thread with the consumer
     proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, 
messageSink, samzaConsumerMetrics, metricName);
 
-    LOG.info("==============>Created consumer proxy: " + proxy);
+    LOG.info("Created consumer proxy: " + proxy);
   }
 
   /*
@@ -194,6 +184,10 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
    */
   void startConsumer() {
     //set the offset for each TopicPartition
+    if (topicPartitions2Offset.size() <= 0) {
+      LOG.warn("Consumer {} is not subscribed to any SSPs", this);
+    }
+
     topicPartitions2Offset.forEach((tp, startingOffsetString) -> {
       long startingOffset = Long.valueOf(startingOffsetString);
 
@@ -209,16 +203,15 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
         throw new SamzaException(e);
       }
 
-      LOG.info("==============>Changing Consumer's position for tp = " + tp + 
" to " + startingOffsetString);
+      LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + 
startingOffsetString);
 
       // add the partition to the proxy
       proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset);
     });
 
-    System.out.println("#####################started " + this + "; kc=" + 
kafkaConsumer);
     // start the proxy thread
     if (proxy != null && !proxy.isRunning()) {
-      System.out.println("#####################starting proxy " + proxy);
+      LOG.info("Starting proxy: " + proxy);
       proxy.start();
     }
   }
@@ -226,29 +219,34 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
   private void setFetchThresholds() {
     // get the thresholds, and set defaults if not defined.
     KafkaConfig kafkaConfig = new KafkaConfig(config);
+
     Option<String> fetchThresholdOption = 
kafkaConfig.getConsumerFetchThreshold(systemName);
     long fetchThreshold = FETCH_THRESHOLD;
     if (fetchThresholdOption.isDefined()) {
       fetchThreshold = Long.valueOf(fetchThresholdOption.get());
-      LOG.info("fetchThresholdOption is defined. fetchThreshold=" + 
fetchThreshold);
+      LOG.info("fetchThresholdOption is configured. fetchThreshold=" + 
fetchThreshold);
     }
+
     Option<String> fetchThresholdBytesOption = 
kafkaConfig.getConsumerFetchThresholdBytes(systemName);
     long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
     if (fetchThresholdBytesOption.isDefined()) {
       fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
-      LOG.info("fetchThresholdBytesOption is defined. fetchThresholdBytes=" + 
fetchThresholdBytes);
+      LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" 
+ fetchThresholdBytes);
     }
+
+    int numTPs = topicPartitions2SSP.size();
+    assert (numTPs == topicPartitions2Offset.size());
+
     LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; 
fetchThreshold=" + fetchThreshold);
-    LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; 
topicPartition2SSP #="
-        + topicPartitions2SSP.size());
+    LOG.info("number of topicPartitions " + numTPs);
 
-    if (topicPartitions2SSP.size() > 0) {
-      perPartitionFetchThreshold = fetchThreshold / topicPartitions2SSP.size();
+    if (numTPs > 0) {
+      perPartitionFetchThreshold = fetchThreshold / numTPs;
       LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold);
       if (fetchThresholdBytesEnabled) {
         // currently this feature cannot be enabled, because we do not have 
the size of the messages available.
         // messages get double buffered, hence divide by 2
-        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / 
topicPartitions2SSP.size();
+        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
         LOG.info("perPartitionFetchThresholdBytes is enabled. 
perPartitionFetchThresholdBytes="
             + perPartitionFetchThresholdBytes);
       }
@@ -257,23 +255,22 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
 
   @Override
   public void stop() {
-    System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!! stopping "+ "; 
kc=" + kafkaConsumer);
-    System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!!TPs = " + 
topicPartitions2Offset);
+    LOG.info("Stopping Samza kafkaConsumer " + this);
 
     if (!stopped.compareAndSet(false, true)) {
       LOG.warn("attempting to stop stopped consumer.");
       return;
     }
 
-    LOG.warn("Stopping SamzaRawLiKafkaConsumer + " + this);
     // stop the proxy (with 5 minutes timeout)
     if (proxy != null) {
-      System.out.println("##################### stopping proxy " + proxy);
+      LOG.info("Stopping proxy " + proxy);
       proxy.stop(TimeUnit.MINUTES.toMillis(5));
     }
 
     try {
       synchronized (kafkaConsumer) {
+        LOG.info("Closing kafka consumer " + kafkaConsumer);
         kafkaConsumer.close();
       }
     } catch (Exception e) {
@@ -304,7 +301,7 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
 
     topicPartitions2SSP.put(tp, systemStreamPartition);
 
-    LOG.info("============>registering ssp = " + systemStreamPartition + " 
with offset " + offset + "; kc=" + this);
+    LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + 
offset);
 
     String existingOffset = topicPartitions2Offset.get(tp);
     // register the older (of the two) offset in the consumer, to guarantee we 
do not miss any messages.
@@ -328,7 +325,7 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
 
   @Override
   public String toString() {
-    return systemName + " " + clientId + "/" + super.toString();
+    return systemName + "/" + clientId + "/" + super.toString();
   }
 
   @Override
@@ -339,21 +336,15 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
     if (!proxy.isRunning()) {
       stop();
       if (proxy.getFailureCause() != null) {
-        String message = "LiKafkaConsumerProxy has stopped";
-        if (proxy.getFailureCause() instanceof 
org.apache.kafka.common.errors.TopicAuthorizationException) {
-          message +=
-              " due to TopicAuthorizationException Please refer to 
go/samzaacluserguide to correctly set up acls for your topic";
-        }
+        String message = "KafkaConsumerProxy has stopped";
         throw new SamzaException(message, proxy.getFailureCause());
       } else {
-        LOG.warn("Failure cause not populated for LiKafkaConsumerProxy");
+        LOG.warn("Failure cause is not populated for KafkaConsumerProxy");
         throw new SamzaException("LiKafkaConsumerProxy has stopped");
       }
     }
 
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = 
super.poll(systemStreamPartitions, timeout);
-    //LOG.info("=============================>. Res for " + 
systemStreamPartitions);
-    //LOG.info("=============================>. Res:" + res.toString());
     return res;
   }
 
@@ -399,14 +390,14 @@ public class NewKafkaSystemConsumer<K, V> extends 
BlockingEnvelopeMap implements
       }
 
       if (fetchThresholdBytesEnabled) {
-        return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; 
// TODO Validate
+        return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
       } else {
         return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold;
       }
     }
 
     void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope 
envelope) {
-      LOG.info("==============>Incoming message ssp = {}: envelope = {}.", 
ssp, envelope);
+      LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope);
 
       try {
         put(ssp, envelope);

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
deleted file mode 100644
index a3f76e7..0000000
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.samza.system.kafka
-
-import java.nio.ByteBuffer
-import java.util.concurrent.CountDownLatch
-
-import kafka.api.{PartitionOffsetsResponse, _}
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
-import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, 
MessageSet}
-import org.apache.kafka.common.protocol.Errors
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Logging
-import org.junit.Assert._
-import org.junit._
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.mockito.{Matchers, Mockito}
-
-import scala.collection.JavaConverters._
-
-class TestBrokerProxy extends Logging {
-  /*
-  val tp2 = new TopicAndPartition("Redbird", 2013)
-  var fetchTp1 = true // control whether fetching tp1 messages or not
-
-  @Test def brokerProxyRetrievesMessagesCorrectly() = {
-    val (bp, tp, sink) = getMockBrokerProxy()
-
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-    // Add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, Option("0"))
-    Thread.sleep(1000)
-    assertEquals(2, sink.receivedMessages.size)
-    assertEquals(42, sink.receivedMessages(0)._2.offset)
-    assertEquals(84, sink.receivedMessages(1)._2.offset)
-  }
-
-  @Test def brokerProxySkipsFetchForEmptyRequests() = {
-    val (bp, tp, sink) = getMockBrokerProxy()
-
-    bp.start
-    // Only add tp2, which should never receive messages since sink disables 
it.
-    bp.addTopicPartition(tp2, Option("0"))
-    Thread.sleep(1000)
-    assertEquals(0, sink.receivedMessages.size)
-    assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, 
bp.port)).getCount > 0)
-    assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount)
-  }
-
-  @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
-    val (bp, tp, _) = getMockBrokerProxy()
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-
-    try {
-      bp.addTopicPartition(tp, Option("1"))
-      fail("Should have thrown an exception")
-    } catch {
-      case se: SamzaException => assertEquals(se.getMessage, "Already 
consuming TopicPartition [Redbird,2012]")
-      case other: Exception => fail("Got some other exception than what we 
were expecting: " + other)
-    }
-  }
-
-  def getMockBrokerProxy() = {
-    val sink = new MessageSink {
-      val receivedMessages = new 
scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, 
Boolean)]()
-
-      def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
-
-      def refreshDropped() {}
-
-      def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long) {
-        receivedMessages += ((tp, msg, msg.offset.equals(highWatermark)))
-      }
-
-      def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: 
Boolean) {
-      }
-
-      // Never need messages for tp2.
-      def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) 
&& fetchTp1
-    }
-
-    val system = "daSystem"
-    val host = "host"
-    val port = 2222
-    val tp = new TopicAndPartition("Redbird", 2012)
-    val metrics = new KafkaSystemConsumerMetrics(system)
-
-    metrics.registerBrokerProxy(host, port)
-    metrics.registerTopicAndPartition(tp)
-    metrics.topicPartitions.get((host, port)).set(1)
-
-    val bp = new BrokerProxy(
-      host,
-      port,
-      system,
-      "daClientId",
-      metrics,
-      sink,
-      offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
-
-      override val sleepMSWhileNoTopicPartitions = 100
-      // Speed up for test
-      var alreadyCreatedConsumer = false
-
-      // Scala traits and Mockito mocks don't mix, unfortunately.
-      override def createSimpleConsumer() = {
-        if (alreadyCreatedConsumer) {
-          System.err.println("Should only be creating one consumer in this 
test!")
-          throw new InterruptedException("Should only be creating one consumer 
in this test!")
-        }
-        alreadyCreatedConsumer = true
-
-        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new 
StreamFetchSizes(42)) {
-          val sc = Mockito.mock(classOf[SimpleConsumer])
-          val mockOffsetResponse = {
-            val offsetResponse = Mockito.mock(classOf[OffsetResponse])
-            val partitionOffsetResponse = {
-              val por = Mockito.mock(classOf[PartitionOffsetsResponse])
-              when(por.offsets).thenReturn(List(1l).toSeq)
-              por
-            }
-
-            val map = scala.Predef.Map[TopicAndPartition, 
PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> 
partitionOffsetResponse)
-            when(offsetResponse.partitionErrorAndOffsets).thenReturn(map)
-            offsetResponse
-          }
-
-          
when(sc.getOffsetsBefore(any(classOf[OffsetRequest]))).thenReturn(mockOffsetResponse)
-
-          val fetchResponse = {
-            val fetchResponse = Mockito.mock(classOf[FetchResponse])
-
-            val messageSet = {
-              val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
-
-              def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
-              val messages = List(new MessageAndOffset(getMessage, 42), new 
MessageAndOffset(getMessage, 84))
-
-              when(messageSet.sizeInBytes).thenReturn(43)
-              when(messageSet.size).thenReturn(44)
-              when(messageSet.iterator).thenReturn(messages.iterator)
-              when(messageSet.head).thenReturn(messages.head)
-              messageSet
-            }
-
-            val fetchResponsePartitionData = 
FetchResponsePartitionData(Errors.NONE, 500, messageSet)
-            val map = scala.Predef.Map[TopicAndPartition, 
FetchResponsePartitionData](tp -> fetchResponsePartitionData)
-
-            when(fetchResponse.data).thenReturn(map.toSeq)
-            when(fetchResponse.messageSet(any(classOf[String]), 
any(classOf[Int]))).thenReturn(messageSet)
-            fetchResponse
-          }
-          when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse)
-
-          override def close() = sc.close()
-
-          override def send(request: TopicMetadataRequest): 
TopicMetadataResponse = sc.send(request)
-
-          override def fetch(request: FetchRequest): FetchResponse = {
-            // Verify that we only get fetch requests for one tp, even though
-            // two were registered. This is to verify that
-            // sink.needsMoreMessages works.
-            assertEquals(1, request.requestInfo.size)
-            sc.fetch(request)
-          }
-
-          when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), 
any(classOf[Long]), any(classOf[Int]))).thenReturn(100)
-
-          override def getOffsetsBefore(request: OffsetRequest): 
OffsetResponse = sc.getOffsetsBefore(request)
-
-          override def commitOffsets(request: OffsetCommitRequest): 
OffsetCommitResponse = sc.commitOffsets(request)
-
-          override def fetchOffsets(request: OffsetFetchRequest): 
OffsetFetchResponse = sc.fetchOffsets(request)
-
-          override def earliestOrLatestOffset(topicAndPartition: 
TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = 
sc.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
-        }
-      }
-
-    }
-
-    (bp, tp, sink)
-  }
-
-  @Test def brokerProxyUpdateLatencyMetrics() = {
-    val (bp, tp, _) = getMockBrokerProxy()
-
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-    Thread.sleep(1000)
-    // update when fetching messages
-    assertEquals(500, bp.metrics.highWatermark.get(tp).getValue)
-    assertEquals(415, bp.metrics.lag.get(tp).getValue)
-
-    fetchTp1 = false
-    Thread.sleep(1000)
-    // update when not fetching messages
-    assertEquals(100, bp.metrics.highWatermark.get(tp).getValue)
-    assertEquals(15, bp.metrics.lag.get(tp).getValue)
-
-    fetchTp1 = true
-  }
-
- @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = {
-    // Need to wait for the thread to do some work before ending the test
-    val countdownLatch = new CountDownLatch(1)
-    var failString: String = null
-
-    val mockMessageSink = mock(classOf[MessageSink])
-    when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
-
-    val doNothingMetrics = new KafkaSystemConsumerMetrics()
-
-    val tp = new TopicAndPartition("topic", 42)
-
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    // This will be used by the simple consumer below, and this is the 
response that simple consumer needs
-    
when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
-    
when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp))).thenReturn(1492l)
-
-    var callsToCreateSimpleConsumer = 0
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    // Create an answer that first indicates offset out of range on first 
invocation and on second
-    // verifies that the parameters have been updated to what we expect them 
to be
-    val answer = new Answer[FetchResponse]() {
-      var invocationCount = 0
-
-      def answer(invocation: InvocationOnMock): FetchResponse = {
-        val arguments = 
invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String,
 Long)]
-
-        if (invocationCount == 0) {
-          if (arguments !=(tp, 0)) {
-            failString = "First invocation did not have the right arguments: " 
+ arguments
-            countdownLatch.countDown()
-          }
-          val mfr = mock(classOf[FetchResponse])
-          when(mfr.hasError).thenReturn(true)
-          when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
-
-          val messageSet = mock(classOf[MessageSet])
-          when(messageSet.iterator).thenReturn(Iterator.empty)
-          val response = mock(classOf[FetchResponsePartitionData])
-          when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
-          val responseMap = Map(tp -> response)
-          when(mfr.data).thenReturn(responseMap.toSeq)
-          invocationCount += 1
-          mfr
-        } else {
-          if (arguments !=(tp, 1492)) {
-            failString = "On second invocation, arguments were not correct: " 
+ arguments
-          }
-          countdownLatch.countDown()
-          Thread.currentThread().interrupt()
-          null
-        }
-      }
-    }
-
-    when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
-
-    // So now we have a fetch response that will fail.  Prime the 
mockGetOffset to send us to a new offset
-
-    val bp = new BrokerProxy("host", 423, "system", "clientID", 
doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new 
StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-
-      override def createSimpleConsumer() = {
-        if (callsToCreateSimpleConsumer > 1) {
-          failString = "Tried to create more than one simple consumer"
-          countdownLatch.countDown()
-        }
-        callsToCreateSimpleConsumer += 1
-        mockSimpleConsumer
-      }
-    }
-
-    bp.addTopicPartition(tp, Option("0"))
-    bp.start
-    countdownLatch.await()
-    bp.stop
-    if (failString != null) {
-      fail(failString)
-    }
-  }
-
-  /**
-    * TODO fix
-   * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
-   * that it owns when a consumer failure occurs.
-   */
-  @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = {
-    val countdownLatch = new CountDownLatch(1)
-    var abdicated: Option[TopicAndPartition] = None
-    @volatile var refreshDroppedCount = 0
-    val mockMessageSink = new MessageSink {
-      override def setIsAtHighWatermark(tp: TopicAndPartition, 
isAtHighWatermark: Boolean) {
-      }
-
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long) {
-      }
-
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
-        abdicated = Some(tp)
-        countdownLatch.countDown
-      }
-
-      override def refreshDropped() {
-        refreshDroppedCount += 1
-      }
-
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
-        true
-      }
-    }
-
-    val doNothingMetrics = new KafkaSystemConsumerMetrics()
-    val tp = new TopicAndPartition("topic", 42)
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    
when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
-    
when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp))).thenReturn(1492l)
-    when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new 
SamzaException("Pretend this is a ClosedChannelException. Can't use 
ClosedChannelException because it's checked, and Mockito doesn't like that."))
-
-    val bp = new BrokerProxy("host", 567, "system", "clientID", 
doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new 
StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-
-    val waitForRefresh = () => {
-      val currentRefreshDroppedCount = refreshDroppedCount
-      while (refreshDroppedCount == currentRefreshDroppedCount) {
-        Thread.sleep(100)
-      }
-    }
-
-    bp.addTopicPartition(tp, Option("0"))
-    bp.start
-    // BP should refresh on startup.
-    waitForRefresh()
-    countdownLatch.await()
-    // BP should continue refreshing after it's abdicated all 
TopicAndPartitions.
-    waitForRefresh()
-    bp.stop
-    assertEquals(tp, abdicated.getOrElse(null))
-  }
-
-  @Test def brokerProxyAbdicatesHardErrors(): Unit = {
-    val doNothingMetrics = new KafkaSystemConsumerMetrics
-    val mockMessageSink = new MessageSink {
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long) {}
-      override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")}
-      override def setIsAtHighWatermark(tp: TopicAndPartition, 
isAtHighWatermark: Boolean): Unit = {}
-    }
-    val mockOffsetGetter = mock(classOf[GetOffset])
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-
-    val bp = new BrokerProxy("host", 658, "system", "clientID", 
doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new 
StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    var caughtError = false
-    try {
-      bp.thread.run
-    } catch {
-      case e: SamzaException => {
-        assertEquals(e.getMessage, "Got out of memory error in broker proxy 
thread.")
-        info("Received OutOfMemoryError in broker proxy.")
-        caughtError = true
-      }
-    }
-    assertEquals(true, caughtError)
-    val mockMessageSink2 = new MessageSink {
-      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
-      override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {}
-      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long): Unit = {}
-      override def refreshDropped(): Unit = {throw new 
StackOverflowError("Test - SOE")}
-      override def setIsAtHighWatermark(tp: TopicAndPartition, 
isAtHighWatermark: Boolean): Unit = {}
-    }
-    caughtError = false
-    val bp2 = new BrokerProxy("host", 689, "system", "clientID2", 
doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new 
StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    try {
-      bp2.thread.run
-    } catch {
-      case e: SamzaException => {
-        assertEquals(e.getMessage, "Got stack overflow error in broker proxy 
thread.")
-        info("Received StackOverflowError in broker proxy.")
-        caughtError = true
-      }
-    }
-    assertEquals(true, caughtError)
-  }
-
-  @Test
-       def brokerProxyStopCloseConsumer: Unit = {
-    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
-    val bp = new BrokerProxy("host", 0, "system", "clientID", new 
KafkaSystemConsumerMetrics(), null){
-      override def createSimpleConsumer() = {
-        mockSimpleConsumer
-      }
-    }
-    bp.start
-    bp.stop
-    verify(mockSimpleConsumer).close
-  }
-  */
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 2ea9a5f..8405c63 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -223,16 +223,16 @@ class StreamTaskTestUtil {
    * interrupt, which is forwarded on to ThreadJob, and marked as a failure).
    */
   def stopJob(job: StreamJob) {
-    // make sure we don't kill the job before it was started
+    // make sure we don't kill the job before it was started.
+    // eventProcesses guarantees all the consumers have been initialized
     val tasks = TestTask.tasks
     val task = tasks.values.toList.head
     task.eventProcessed.await(60, TimeUnit.SECONDS)
-    System.out.println("THREAD: JOB KILL BEFORE")
+    assertEquals(0, task.eventProcessed.getCount)
+
     // Shutdown task.
     job.kill
-    System.out.println("THREAD: JOB KILL")
     val status = job.waitForFinish(60000)
-    System.out.println("THREAD: JOB KILL WAIT")
     assertEquals(ApplicationStatus.UnsuccessfulFinish, status)
   }
 

Reply via email to