Repository: samza
Updated Branches:
  refs/heads/master 9fa8beed7 -> edce6b76d


SAMZA-1479; Refactor KafkaCheckpointManager, KafkaCheckpointLogKey and their 
tests

Notable changes:
* Rewrite `KafkaCheckpointLogKey` into two classes - an immutable class, and a 
SerDe
* Remove dependency on static setters in the `KafkaCheckpointLogKey`
* Change lifecycle of components in KafkaCheckpointManager
  - It's safe to start producers and consumers during `start` as opposed to 
lazy loading them during writes, and reads.
  - Initialize systemProducer and systemConsumer during construction
* Simplify logic for ignoring checkpoint validations
* Re-write checkpointManager#readLog() to use a simpler API.
* Remove unnecessary complexity after the migration from 0.8
* Remove unnecessary locking in startup, and shut-down
* Remove dependencies on SimpleConsumer configs like bufferSize, fetchSize, 
socketTimeout
* Refactor KafkaCheckpointManagerFactory and remove static 
getCheckpointSystemNameAndFactory
* Bug-fix : Register the taskName correctly (instead of using a dummy string 
for the taskName)

* Add unit tests to verify more checkpoint scenarios
* Consolidate unit tests into utils for creating producer, consumer and admin 
instances
* Convert/consolidate most long-running integration tests into unit tests

Author: Jagadish <jvenkatra...@linkedin.com>

Reviewers: Prateek Maheshwari <pmahe...@linkedin.com>

Closes #348 from vjagadish1989/kafka-checkpointmanager-refactor


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/edce6b76
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/edce6b76
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/edce6b76

Branch: refs/heads/master
Commit: edce6b76dbf3768884ea5fdadac748df7dec031b
Parents: 9fa8bee
Author: Jagadish <jagad...@apache.org>
Authored: Fri Nov 17 14:41:05 2017 -0800
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Nov 17 14:41:05 2017 -0800

----------------------------------------------------------------------
 .../samza/serializers/TestCheckpointSerde.scala |   8 +
 .../checkpoint/kafka/KafkaCheckpointLogKey.java | 110 ++++++
 .../kafka/KafkaCheckpointLogKeySerde.java       |  68 ++++
 .../samza/system/kafka/KafkaStreamSpec.java     |   4 +
 .../kafka/KafkaCheckpointLogKey.scala           | 171 --------
 .../kafka/KafkaCheckpointManager.scala          | 385 +++++++++---------
 .../kafka/KafkaCheckpointManagerFactory.scala   |  81 +---
 .../kafka/KafkaSystemConsumerMetrics.scala      |   2 -
 .../kafka/TestKafkaCheckpointLogKeySerde.java   |  53 +++
 .../kafka/TestKafkaCheckpointManagerJava.java   | 247 ++++++++++++
 .../kafka/TeskKafkaCheckpointLogKey.scala       |  61 ---
 .../kafka/TestKafkaCheckpointManager.scala      | 388 ++++++-------------
 12 files changed, 822 insertions(+), 756 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
index 029bbef..c2060e0 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
@@ -29,6 +29,7 @@ import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 class TestCheckpointSerde {
   @Test
@@ -57,4 +58,11 @@ class TestCheckpointSerde {
     assertNotSame(mapping, backToMap)
   }
 
+  @Test
+  def testNullCheckpointSerde: Unit = {
+    val checkpointBytes = null.asInstanceOf[Array[Byte]]
+    val checkpointSerde = new CheckpointSerde
+    val checkpoint = checkpointSerde.fromBytes(checkpointBytes)
+    assertNull(checkpoint)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
 
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
new file mode 100644
index 0000000..05114f9
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.container.TaskName;
+
+/**
+ * The key used for messages that are written to the Kafka checkpoint log.
+ */
+public class KafkaCheckpointLogKey {
+
+  public static final String CHECKPOINT_KEY_TYPE = "checkpoint";
+  /**
+   * The SystemStreamPartitionGrouperFactory configured for this job run. 
Since, checkpoints of different
+   * groupers are not compatible, we persist and validate them across job runs.
+   */
+  private final String grouperFactoryClassName;
+  /**
+   * The taskName corresponding to the checkpoint. Checkpoints in Samza are 
stored per-task.
+   */
+  private final TaskName taskName;
+  /**
+   * The type of this key. Used for supporting multiple key-types. Currently, 
the only supported key-type is
+   * "checkpoint"
+   */
+  private final String type;
+
+  public KafkaCheckpointLogKey(String type, TaskName taskName, String 
grouperFactoryClassName) {
+    Preconditions.checkNotNull(grouperFactoryClassName);
+    Preconditions.checkNotNull(taskName);
+    Preconditions.checkNotNull(type);
+    Preconditions.checkState(!grouperFactoryClassName.isEmpty(), "Empty 
grouper factory class provided");
+
+    Preconditions.checkState(type.equals(CHECKPOINT_KEY_TYPE), 
String.format("Invalid type provided for checkpoint key. " +
+        "Expected: (%s) Actual: (%s)", CHECKPOINT_KEY_TYPE, type));
+
+    this.grouperFactoryClassName = grouperFactoryClassName;
+    this.taskName = taskName;
+    this.type = type;
+  }
+
+  public String getGrouperFactoryClassName() {
+    return grouperFactoryClassName;
+  }
+
+  public TaskName getTaskName() {
+    return taskName;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  /**
+   * Two {@link KafkaCheckpointLogKey}s are equal iff their grouperFactory 
class, taskName and type are equal.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    KafkaCheckpointLogKey that = (KafkaCheckpointLogKey) o;
+
+    if (!grouperFactoryClassName.equals(that.grouperFactoryClassName)) {
+      return false;
+    }
+    if (!taskName.equals(that.taskName)) {
+      return false;
+    }
+    return type.equals(that.type);
+  }
+
+  /**
+   * Two {@link KafkaCheckpointLogKey}s are equal iff their grouperFactory 
class, taskName and type are equal.
+   */
+  @Override
+  public int hashCode() {
+    int result = grouperFactoryClassName.hashCode();
+    result = 31 * result + taskName.hashCode();
+    result = 31 * result + type.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("KafkaCheckpointLogKey[factoryClass: %s, taskName: 
%s, type: %s]",
+        grouperFactoryClassName, taskName, type);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
 
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
new file mode 100644
index 0000000..cc883b6
--- /dev/null
+++ 
b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.Serde;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+
+/**
+ * A serde for {@link KafkaCheckpointLogKey}.
+ *
+ * <p> Keys in the Kafka checkpoint log are serialized as JSON strings.
+ * E.g.: {"systemstreampartition-grouper-factory"" 
:"org.apache.samza.container.grouper.stream.GroupByPartitionFactory",
+ *    "taskName":"Partition 0", "type":"checkpoint"}
+ */
+public class KafkaCheckpointLogKeySerde implements 
Serde<KafkaCheckpointLogKey> {
+
+  private static final String SSP_GROUPER_FACTORY_FIELD = 
"systemstreampartition-grouper-factory";
+  private static final String TASK_NAME_FIELD = "taskName";
+  private static final String TYPE_FIELD = "type";
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  @Override
+  public byte[] toBytes(KafkaCheckpointLogKey key) {
+    try {
+      return mapper.writeValueAsBytes(ImmutableMap.of(
+          SSP_GROUPER_FACTORY_FIELD, key.getGrouperFactoryClassName(),
+          TASK_NAME_FIELD, key.getTaskName().toString(),
+          TYPE_FIELD, key.getType()
+        ));
+    } catch (Exception e) {
+      throw new SamzaException(String.format("Exception in serializing: %s", 
key), e);
+    }
+  }
+
+  @Override
+  public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
+    try {
+      LinkedHashMap<String, String> deserializedKey = mapper.readValue(bytes, 
LinkedHashMap.class);
+      return new KafkaCheckpointLogKey(deserializedKey.get(TYPE_FIELD), new 
TaskName(deserializedKey.get(TASK_NAME_FIELD)), 
deserializedKey.get(SSP_GROUPER_FACTORY_FIELD)
+      );
+    } catch (Exception e) {
+      throw new SamzaException(String.format("Exception in de-serializing 
checkpoint bytes: %s",
+          Arrays.toString(bytes)), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index de7b7b0..a84c434 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -167,6 +167,10 @@ public class KafkaStreamSpec extends StreamSpec {
     return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), 
partitionCount, getReplicationFactor(), getProperties());
   }
 
+  public KafkaStreamSpec copyWithReplicationFactor(int replicationFactor) {
+    return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), 
getPartitionCount(), replicationFactor, getProperties());
+  }
+
   /**
    * Make a copy of the spec with new properties
    * @param properties properties of the Kafka stream

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
deleted file mode 100644
index b77cbb9..0000000
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.checkpoint.kafka
-
-import java.util
-
-import org.apache.samza.SamzaException
-import org.apache.samza.container.TaskName
-import org.codehaus.jackson.`type`.TypeReference
-import org.codehaus.jackson.map.ObjectMapper
-
-import scala.collection.JavaConverters._
-
-/**
- * Kafka Checkpoint Log-specific key used to identify what type of entry is
- * written for any particular log entry.
- *
- * @param map Backing map to hold key values
- */
-class KafkaCheckpointLogKey private (val map: Map[String, String]) {
-  // This might be better as a case class...
-  import org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey._
-
-  /**
-   * Serialize this key to bytes
-   * @return Key as bytes
-   */
-  def toBytes(): Array[Byte] = {
-    val jMap = new util.HashMap[String, String](map.size)
-    jMap.putAll(map.asJava)
-
-    JSON_MAPPER.writeValueAsBytes(jMap)
-  }
-
-  private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new 
SamzaException("No " + CHECKPOINT_KEY_KEY  + " in map for Kafka Checkpoint log 
key"))
-
-  /**
-   * Is this key for a checkpoint entry?
-   *
-   * @return true iff this key's entry is for a checkpoint
-   */
-  def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
-
-
-  /**
-   * If this Key is for a checkpoint entry, return its associated TaskName.
-   *
-   * @return TaskName for this checkpoint or throw an exception if this key 
does not have a TaskName entry
-   */
-  def getCheckpointTaskName = {
-    val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new 
SamzaException("No TaskName in checkpoint key: " + this))
-    new TaskName(asString)
-  }
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
-
-  override def equals(other: Any): Boolean = other match {
-    case that: KafkaCheckpointLogKey =>
-      (that canEqual this) &&
-        map == that.map
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    val state = Seq(map)
-    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
-  }
-}
-
-object KafkaCheckpointLogKey {
-  /**
-   *  Messages in the checkpoint log have keys associated with them. These 
keys are maps that describe the message's
-   *  type, either a checkpoint or a changelog-partition-mapping.
-   */
-  val CHECKPOINT_KEY_KEY = "type"
-  val CHECKPOINT_KEY_TYPE = "checkpoint"
-
-  val CHECKPOINT_TASKNAME_KEY = "taskName"
-  val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = 
"systemstreampartition-grouper-factory"
-
-
-  private val JSON_MAPPER = new ObjectMapper()
-  val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
-
-  var systemStreamPartitionGrouperFactoryString:Option[String] = None
-
-  /**
-   * Set the name of the factory configured to provide the 
SystemStreamPartition grouping
-   * so it be included in the key.
-   *
-   * @param str Config value of SystemStreamPartition Grouper Factory
-   */
-  def setSystemStreamPartitionGrouperFactoryString(str:String) = {
-    systemStreamPartitionGrouperFactoryString = Some(str)
-  }
-
-  /**
-   * Get the name of the factory configured to provide the 
SystemStreamPartition grouping
-   * so it be included in the key
-   */
-  def getSystemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString.getOrElse(throw new 
SamzaException("No SystemStreamPartition grouping factory string has been 
set."))
-
-  /**
-   * Build a key for a a checkpoint log entry for a particular TaskName
-   * @param taskName TaskName to build for this checkpoint entry
-   *
-   * @return Key for checkpoint log entry
-   */
-  def getCheckpointKey(taskName:TaskName) = {
-    val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
-      CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
-      SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> 
getSystemStreamPartitionGrouperFactoryString)
-
-    new KafkaCheckpointLogKey(map)
-  }
-
-  /**
-   * Deserialize a Kafka checkpoint log key
-   * @param bytes Serialized (via JSON) Kafka checkpoint log key
-   * @return Checkpoint log key
-   */
-  def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
-    try {
-      val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, 
KEY_TYPEREFERENCE)
-
-      if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
-        throw new SamzaException("No type entry in checkpoint key: " + jmap)
-      }
-
-      // Only checkpoint keys have ssp grouper factory keys
-      if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
-        val sspGrouperFactory = 
jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
-
-        if (sspGrouperFactory == null) {
-          throw new SamzaException("No SystemStreamPartition Grouper factory 
entry in checkpoint key: " + jmap)
-        }
-
-        if 
(!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
-          throw new 
DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, 
getSystemStreamPartitionGrouperFactoryString)
-        }
-      }
-
-      new KafkaCheckpointLogKey(jmap.asScala.toMap)
-    } catch {
-      case e: Exception =>
-        throw new SamzaException("Exception while deserializing checkpoint 
key", e)
-    }
-  }
-}
-
-class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, 
inConfig:String) extends SamzaException {
-  override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper 
factory (" + inKey +
-    ") does not match value from current configuration (" + inConfig + ").  " +
-    "This likely means the SystemStreamPartitionGrouper was changed between 
job runs, which is not supported."
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 75b4700..217b2b6 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -19,258 +19,251 @@
 
 package org.apache.samza.checkpoint.kafka
 
-import java.nio.ByteBuffer
-import java.util
-import java.util.{Collections, Properties}
+import java.util.Collections
 
-import kafka.utils.ZkUtils
-import org.apache.kafka.common.utils.Utils
+import com.google.common.base.Preconditions
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.config.{Config, JobConfig}
 import org.apache.samza.container.TaskName
+import org.apache.samza.serializers.Serde
+import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.CheckpointSerde
-import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaStreamSpec}
-import org.apache.samza.system.{StreamSpec, SystemAdmin, _}
-import org.apache.samza.util._
+import org.apache.samza.system._
+import org.apache.samza.system.kafka.KafkaStreamSpec
+import org.apache.samza.util.{ExponentialSleepStrategy, Logging}
 import org.apache.samza.{Partition, SamzaException}
 
-import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 /**
- * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
- * To read a checkpoint for a specific taskName, we find the newest message
- * keyed to that taskName. If there is no such message, no checkpoint data
- * exists.  The underlying log has a single partition into which all
- * checkpoints and TaskName to changelog partition mappings are written.
- *
- * This class is thread safe for writing but not for reading checkpoints.
- * This is currently OK since checkpoints are only read on the main thread.
- */
-class KafkaCheckpointManager(
-                              clientId: String,
-                              checkpointTopic: String,
-                              val systemName: String,
-                              replicationFactor: Int,
-                              socketTimeout: Int,
-                              bufferSize: Int,
-                              fetchSize: Int,
-                              getSystemConsumer: () => SystemConsumer,
-                              getSystemAdmin: () => SystemAdmin,
-                              val metadataStore: TopicMetadataStore,
-                              getSystemProducer: () => SystemProducer,
-                              val connectZk: () => ZkUtils,
-                              systemStreamPartitionGrouperFactoryString: 
String,
-                              failOnCheckpointValidation: Boolean,
-                              val retryBackoff: ExponentialSleepStrategy = new 
ExponentialSleepStrategy,
-                              serde: CheckpointSerde = new CheckpointSerde,
-                              checkpointTopicProperties: Properties = new 
Properties) extends CheckpointManager with Logging {
+  * A [[CheckpointManager]] that uses a compacted Kafka topic-partition to 
store the [[Checkpoint]] corresponding to
+  * a task.
+  *
+  * <p> The Kafka partition provides an abstraction of a log to which all 
[[Checkpoint]]s are appended to. The
+  * checkpoints written to the log are keyed by their corresponding taskName.
+  *
+  * <p> This class is thread safe for writing but not for reading checkpoints. 
This is currently OK since checkpoints
+  * are only read on the main thread.
+  */
+class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
+                             systemFactory: SystemFactory,
+                             validateCheckpoint: Boolean,
+                             config: Config,
+                             metricsRegistry: MetricsRegistry,
+                             checkpointMsgSerde: Serde[Checkpoint] = new 
CheckpointSerde) extends CheckpointManager with Logging {
+
+  info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, 
systemName:$checkpointSystem " +
+    s"validateCheckpoints:$validateCheckpoint")
+
+  val checkpointSystem: String = checkpointSpec.getSystemName
+  val checkpointTopic: String = checkpointSpec.getPhysicalName
+  val checkpointSsp = new SystemStreamPartition(checkpointSystem, 
checkpointTopic, new Partition(0))
+  val checkpointKeySerde = new KafkaCheckpointLogKeySerde
+  val expectedGrouperFactory = config.get(JobConfig.SSP_GROUPER_FACTORY)
+
+  val systemProducer = systemFactory.getProducer(checkpointSystem, config, 
metricsRegistry)
+  val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, 
metricsRegistry)
+  val systemAdmin = systemFactory.getAdmin(checkpointSystem, config)
 
   var taskNames = Set[TaskName]()
-  @volatile var systemProducer: SystemProducer = null
-  var systemConsumer: SystemConsumer = null
-  var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
-  val systemAdmin = getSystemAdmin()
+  var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = null
+
+
+  /**
+    * @inheritdoc
+    */
+  override def start {
+    Preconditions.checkNotNull(systemProducer)
+    Preconditions.checkNotNull(systemConsumer)
+    Preconditions.checkNotNull(systemAdmin)
+
+    info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with 
" +
+      s"partition count: ${checkpointSpec.getPartitionCount}")
+    systemAdmin.createStream(checkpointSpec)
+
+    // register and start a producer for the checkpoint topic
+    systemProducer.start
+
+    // register and start a consumer for the checkpoint topic
+    val oldestOffset = getOldestOffset(checkpointSsp)
+    info(s"Starting checkpoint SystemConsumer from oldest offset 
$oldestOffset")
+    systemConsumer.register(checkpointSsp, oldestOffset)
+    systemConsumer.start
+
+    if (validateCheckpoint) {
+      info(s"Validating checkpoint stream")
+      systemAdmin.validateStream(checkpointSpec)
+    }
+  }
+
+  /**
+    * @inheritdoc
+    */
+  override def register(taskName: TaskName) {
+    debug(s"Registering taskName: $taskName")
+    systemProducer.register(taskName.getTaskName)
+    taskNames += taskName
+  }
+
+  /**
+    * @inheritdoc
+    */
+  override def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+    if (!taskNames.contains(taskName)) {
+      throw new SamzaException(s"Task: $taskName is not registered with this 
CheckpointManager")
+    }
 
-  val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
+    info(s"Reading checkpoint for taskName $taskName")
 
+    if (taskNamesToCheckpoints == null) {
+      debug("Reading checkpoints for the first time")
+      taskNamesToCheckpoints = readCheckpoints()
+    } else {
+      debug("Updating existing checkpoint mappings")
+      taskNamesToCheckpoints ++= readCheckpoints()
+    }
 
-  
KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
+    val checkpoint: Checkpoint = taskNamesToCheckpoints.getOrElse(taskName, 
null)
 
-  info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, 
systemName=%s" format(clientId, checkpointTopic, systemName))
+    info(s"Got checkpoint state for taskName - $taskName: $checkpoint")
+    checkpoint
+  }
 
   /**
-   * Write Checkpoint for specified taskName to log
-   *
-   * @param taskName Specific Samza taskName of which to write a checkpoint of.
-   * @param checkpoint Reference to a Checkpoint object to store offset data 
in.
-   **/
+    * @inheritdoc
+    */
   override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
-    val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
-    val keyBytes = key.toBytes()
-    val msgBytes = serde.toBytes(checkpoint)
-    val systemStream = new SystemStream(systemName, checkpointTopic)
-    val envelope = new OutgoingMessageEnvelope(systemStream, keyBytes, 
msgBytes)
+    val key = new 
KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, taskName, 
expectedGrouperFactory)
+    val keyBytes = try {
+      checkpointKeySerde.toBytes(key)
+    } catch {
+      case e: Exception => throw new SamzaException(s"Exception when writing 
checkpoint-key for $taskName: $checkpoint", e)
+    }
+    val msgBytes = try {
+      checkpointMsgSerde.toBytes(checkpoint)
+    } catch {
+      case e: Exception => throw new SamzaException(s"Exception when writing 
checkpoint for $taskName: $checkpoint", e)
+    }
+
+    val envelope = new OutgoingMessageEnvelope(checkpointSsp, keyBytes, 
msgBytes)
+    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
 
     retryBackoff.run(
       loop => {
-        if (systemProducer == null) {
-          synchronized {
-            if (systemProducer == null) {
-              systemProducer = getSystemProducer()
-              systemProducer.register(taskName.getTaskName)
-              systemProducer.start
-            }
-          }
-        }
-
         systemProducer.send(taskName.getTaskName, envelope)
         systemProducer.flush(taskName.getTaskName) // make sure it is written
-        debug("Completed writing checkpoint=%s into %s topic for system %s." 
format(checkpoint, checkpointTopic, systemName) )
+        debug(s"Wrote checkpoint: $checkpoint for task: $taskName")
         loop.done
       },
 
       (exception, loop) => {
-        warn("Failed to write checkpoint log partition entry %s: %s. 
Retrying." format(key, exception))
-        debug("Exception detail:", exception)
+        warn(s"Retrying failed checkpoint write to key: $key, checkpoint: 
$checkpoint for task: $taskName", exception)
       }
     )
   }
 
   /**
-   * Read the last checkpoint for specified TaskName
-   *
-   * @param taskName Specific Samza taskName for which to get the last 
checkpoint of.
-   **/
-  override def readLastCheckpoint(taskName: TaskName): Checkpoint = {
-    if (!taskNames.contains(taskName)) {
-      throw new SamzaException(taskName + " not registered with this 
CheckpointManager")
-    }
-
-    info("Reading checkpoint for taskName " + taskName)
+    * @inheritdoc
+    */
+  override def clearCheckpoints: Unit = {
+    info("Clear checkpoint stream %s in system %s" format(checkpointTopic, 
checkpointSystem))
+    systemAdmin.clearStream(checkpointSpec)
+  }
 
-    if (taskNamesToOffsets == null) {
-      info("No TaskName to checkpoint mapping provided.  Reading for first 
time.")
-      taskNamesToOffsets = readCheckpointsFromLog()
+  override def stop = {
+    if (systemProducer != null) {
+      systemProducer.stop
     } else {
-      info("Already existing checkpoint mapping.  Merging new offsets")
-      taskNamesToOffsets ++= readCheckpointsFromLog()
+      error("Checkpoint SystemProducer should not be null")
     }
 
-    val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
-
-    info("Got checkpoint state for taskName %s: %s" format(taskName, 
checkpoint))
-
-    checkpoint
+    if (systemConsumer != null) {
+      systemConsumer.stop
+    } else {
+      error("Checkpoint SystemConsumer should not be null")
+    }
+    info("CheckpointManager stopped.")
   }
 
   /**
-   * Read through entire log, discarding changelog mapping, and building map 
of TaskNames to Checkpoints
-   */
-  def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
+    * Returns the checkpoints from the log.
+    *
+    * <p> The underlying [[SystemConsumer]] is stateful and tracks its 
offsets. Hence, each invocation of this method
+    * will read the log from where it left off previously. This allows for 
multiple efficient calls to [[readLastCheckpoint()]]
+    */
+  private def readCheckpoints(): Map[TaskName, Checkpoint] = {
     val checkpoints = mutable.Map[TaskName, Checkpoint]()
 
-    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
+    val iterator = new SystemStreamPartitionIterator(systemConsumer, 
checkpointSsp)
+    var numMessagesRead = 0
 
-    def handleCheckpoint(payload: ByteBuffer, 
checkpointKey:KafkaCheckpointLogKey): Unit = {
-      val taskName = checkpointKey.getCheckpointTaskName
-      val checkpoint = serde.fromBytes(Utils.readBytes(payload))
-      checkpoints.put(taskName, checkpoint) // replacing any existing, older 
checkpoints as we go
-    }
-
-    readLog(shouldHandleEntry, handleCheckpoint)
-    checkpoints.toMap /* of the immutable kind */
-  }
-
-  private def getSSPMetadata(topic: String, partition: Partition): 
SystemStreamPartitionMetadata = {
-    val metaDataMap: java.util.Map[String, SystemStreamMetadata] = 
systemAdmin.getSystemStreamMetadata(Collections.singleton(topic))
-    val checkpointMetadata: SystemStreamMetadata = metaDataMap.get(topic)
-    if (checkpointMetadata == null) {
-      throw new SamzaException("Cannot get metadata for system=%s, topic=%s" 
format(systemName, topic))
-    }
+    while (iterator.hasNext) {
+      val checkpointEnvelope: IncomingMessageEnvelope = iterator.next
+      val offset = checkpointEnvelope.getOffset
 
-    val partitionMetaData = 
checkpointMetadata.getSystemStreamPartitionMetadata().get(partition)
-    if (partitionMetaData == null) {
-      throw new SamzaException("Cannot get partitionMetaData for system=%s, 
topic=%s" format(systemName, topic))
-    }
+      numMessagesRead += 1
+      if (numMessagesRead % 1000 == 0) {
+        info(s"Read $numMessagesRead from topic: $checkpointTopic. Current 
offset: $offset")
+      }
 
-    return partitionMetaData
-  }
+      val keyBytes = checkpointEnvelope.getKey.asInstanceOf[Array[Byte]]
+      if (keyBytes == null) {
+        throw new SamzaException("Encountered a checkpoint message with null 
key. Topic:$checkpointTopic " +
+          s"Offset:$offset")
+      }
 
-  /**
-   * Reads an entry from the checkpoint log and invokes the provided lambda on 
it.
-   *
-   * @param handleEntry Code to handle an entry in the log once it's found
-   */
-  private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
-                      handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => 
Unit): Unit = {
-    info("Reading from checkpoint system:%s topic:%s" format(systemName, 
checkpointTopic))
-
-    val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, 
checkpointTopic, new Partition(0))
-
-    if (systemConsumer == null) {
-      val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
-      val oldestOffset = partitionMetadata.getOldestOffset
-
-      systemConsumer = getSystemConsumer()
-      systemConsumer.register(ssp, oldestOffset)
-      systemConsumer.start()
-    }
+      val checkpointKey = try {
+        checkpointKeySerde.fromBytes(keyBytes)
+      } catch {
+        case e: Exception => throw new SamzaException(s"Exception while 
serializing checkpoint-key. " +
+          s"Topic: $checkpointTopic Offset: $offset", e)
+      }
 
-    val iterator =  new SystemStreamPartitionIterator(systemConsumer, ssp);
-    var msgCount = 0
-    while (iterator.hasNext) {
-      val msg = iterator.next
-      msgCount += 1
-
-      val offset = msg.getOffset
-      val key = msg.getKey.asInstanceOf[Array[Byte]]
-      if (key == null) {
-        throw new KafkaUtilException(
-          "While reading checkpoint (currentOffset=%s) stream encountered 
message without key." format offset)
+      // If the grouper in the key is not equal to the configured grouper, 
error out.
+      val actualGrouperFactory = checkpointKey.getGrouperFactoryClassName
+      if (!expectedGrouperFactory.equals(actualGrouperFactory)) {
+        warn(s"Grouper mismatch. Configured: $expectedGrouperFactory Actual: 
$actualGrouperFactory ")
+        if (validateCheckpoint) {
+          throw new SamzaException("SSPGrouperFactory in the checkpoint topic 
does not match the configured value" +
+            s"Configured value: $expectedGrouperFactory; Actual value: 
$actualGrouperFactory Offset: $offset")
+        }
       }
 
-      val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
+      // If the type of the key is not 
KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, it can safely be ignored.
+      if 
(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(checkpointKey.getType)) {
+        val checkpointBytes = 
checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
+        val checkpoint = try {
+          checkpointMsgSerde.fromBytes(checkpointBytes)
+        } catch {
+          case e: Exception => throw new SamzaException(s"Exception while 
serializing checkpoint-message. " +
+            s"Topic: $checkpointTopic Offset: $offset", e)
+        }
 
-      if (!shouldHandleEntry(checkpointKey)) {
-        info("Skipping checkpoint log entry at offset %s with key %s." 
format(offset, checkpointKey))
-      } else {
-        val checkpointPayload = 
ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
-        handleEntry(checkpointPayload, checkpointKey)
+        checkpoints.put(checkpointKey.getTaskName, checkpoint)
       }
     }
-    info("Done reading %s messages from checkpoint system:%s topic:%s" 
format(msgCount, systemName, checkpointTopic))
+    info(s"Read $numMessagesRead messages from system:$checkpointSystem 
topic:$checkpointTopic")
+    checkpoints.toMap
   }
 
-  override def start {
-    val CHECKPOINT_STREAMID = "unused-temp-checkpoint-stream-id"
-    val spec = new KafkaStreamSpec(CHECKPOINT_STREAMID,
-                                   checkpointTopic, systemName, 1,
-                                   replicationFactor, 
checkpointTopicProperties)
-
-    info("About to create checkpoint stream: " + spec)
-    systemAdmin.createStream(spec)
-    info("Created checkpoint stream: " + spec)
-    try {
-      systemAdmin.validateStream(spec) // SPECIAL VALIDATION FOR CHECKPOINT. 
DO NOT FAIL IF failOnCheckpointValidation IS FALSE
-      info("Validated spec: " + spec)
-    } catch {
-      case e : StreamValidationException =>
-             if (failOnCheckpointValidation) {
-               throw e
-             } else {
-               warn("Checkpoint stream validation partially failed. Ignoring 
it because failOnCheckpointValidation=" + failOnCheckpointValidation)
-             }
-      case e1 : Exception => throw e1
-    }
-  }
-
-  override def register(taskName: TaskName) {
-    debug("Adding taskName " + taskName + " to " + this)
-    taskNames += taskName
-  }
-
-
-  def stop = {
-    synchronized {
-      if (systemProducer != null) {
-        systemProducer.stop
-        systemProducer = null
-      }
+  /**
+    * Returns the oldest available offset for the provided 
[[SystemStreamPartition]].
+    */
+  private def getOldestOffset(ssp: SystemStreamPartition): String = {
+    val topic = ssp.getSystemStream.getStream
+    val partition = ssp.getPartition
 
-      if (systemConsumer != null) {
-        systemConsumer.stop
-        systemConsumer = null
-      }
+    val metaDataMap = 
systemAdmin.getSystemStreamMetadata(Collections.singleton(topic))
+    val checkpointMetadata: SystemStreamMetadata = metaDataMap.get(topic)
+    if (checkpointMetadata == null) {
+      throw new SamzaException(s"Got null metadata for 
system:$checkpointSystem, topic:$topic")
     }
 
-  }
+    val partitionMetaData = 
checkpointMetadata.getSystemStreamPartitionMetadata().get(partition)
+    if (partitionMetaData == null) {
+      throw new SamzaException(s"Got a null partition metadata for 
system:$checkpointSystem, topic:$topic")
+    }
 
-  override def clearCheckpoints = {
-    info("Clear checkpoint stream %s in system %s" format (checkpointTopic, 
systemName))
-    val spec = StreamSpec.createCheckpointStreamSpec(checkpointTopic, 
systemName)
-    systemAdmin.clearStream(spec)
+    return partitionMetaData.getOldestOffset
   }
-
-  override def toString = "KafkaCheckpointManager [systemName=%s, 
checkpointTopic=%s]" format(systemName, checkpointTopic)
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 8ac347c..48d6671 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -19,87 +19,38 @@
 
 package org.apache.samza.checkpoint.kafka
 
-import java.util.Properties
-
-import com.google.common.collect.ImmutableMap
-import kafka.utils.ZkUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointManager, 
CheckpointManagerFactory}
-import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config._
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, 
Logging, Util, _}
-
-
-object KafkaCheckpointManagerFactory {
-  val INJECTED_PRODUCER_PROPERTIES = Map(
-    "acks" -> "all",
-    // Forcibly disable compression because Kafka doesn't support compression
-    // on log compacted topics. Details in SAMZA-586.
-    "compression.type" -> "none")
-
-  /**
-   * Get the checkpoint system and system factory from the configuration
-   * @param config
-   * @return system name and system factory
-   */
-  def getCheckpointSystemStreamAndFactory(config: Config) = {
-
-    val kafkaConfig = new KafkaConfig(config)
-    val systemName = kafkaConfig.getCheckpointSystem.getOrElse(throw new 
SamzaException("no system defined for Kafka's checkpoint manager."))
-
-    val systemFactoryClassName = new SystemConfig(config)
-            .getSystemFactory(systemName)
-            .getOrElse(throw new SamzaException("Missing configuration: " + 
SystemConfig.SYSTEM_FACTORY format systemName))
-    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-    (systemName, systemFactory)
-  }
-}
+import org.apache.samza.system.{StreamSpec, SystemFactory}
+import org.apache.samza.system.kafka.KafkaStreamSpec
+import org.apache.samza.util.{KafkaUtil, Logging, Util, _}
 
 class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with 
Logging {
-  import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory._
 
   def getCheckpointManager(config: Config, registry: MetricsRegistry): 
CheckpointManager = {
-    val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
     val jobName = config.getName.getOrElse(throw new SamzaException("Missing 
job name in configs"))
     val jobId = config.getJobId.getOrElse("1")
 
-    val (systemName: String, systemFactory : SystemFactory) =  
getCheckpointSystemStreamAndFactory(config)
-
     val kafkaConfig = new KafkaConfig(config)
-    val producerConfig = kafkaConfig.getKafkaSystemProducerConfig(
-      systemName,
-      clientId,
-      INJECTED_PRODUCER_PROPERTIES)
+    val checkpointSystemName = kafkaConfig.getCheckpointSystem.getOrElse(
+      throw new SamzaException("No system defined for Kafka's checkpoint 
manager."))
 
-    val noOpMetricsRegistry = new NoOpMetricsRegistry()
+    val checkpointSystemFactoryName = new SystemConfig(config)
+      .getSystemFactory(checkpointSystemName)
+      .getOrElse(throw new SamzaException("Missing configuration: " + 
SystemConfig.SYSTEM_FACTORY format checkpointSystemName))
 
-    val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(systemName, 
clientId)
-    val zkConnect = Option(consumerConfig.zkConnect)
-      .getOrElse(throw new SamzaException("no zookeeper.connect defined in 
config"))
-    val connectZk = () => {
-      ZkUtils(zkConnect, 6000, 6000, false)
-    }
-    val socketTimeout = consumerConfig.socketTimeoutMs
+    val checkpointSystemFactory = 
Util.getObj[SystemFactory](checkpointSystemFactoryName)
+    val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config)
 
+    info(s"Creating a KafkaCheckpointManager to consume from $checkpointTopic")
+    val checkpointSpec = 
KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(checkpointTopic, 
checkpointSystemName))
+        
.copyWithReplicationFactor(kafkaConfig.getCheckpointReplicationFactor.get.toInt)
+        .copyWithProperties(kafkaConfig.getCheckpointTopicProperties)
 
-    new KafkaCheckpointManager(
-      clientId,
-      KafkaUtil.getCheckpointTopic(jobName, jobId, config),
-      systemName,
-      kafkaConfig.getCheckpointReplicationFactor.get.toInt,
-      socketTimeout,
-      consumerConfig.socketReceiveBufferBytes,
-      consumerConfig.fetchMessageMaxBytes,            // must be > buffer size
-      () => systemFactory.getConsumer(systemName, config, noOpMetricsRegistry),
-      () => systemFactory.getAdmin(systemName, config),
-      new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, 
clientId, socketTimeout),
-      () => systemFactory.getProducer(systemName, config, noOpMetricsRegistry),
-      connectZk,
-      config.getSystemStreamPartitionGrouperFactory,      // To find out the 
SSPGrouperFactory class so it can be included/verified in the key
-      config.failOnCheckpointValidation,
-      checkpointTopicProperties = new 
KafkaConfig(config).getCheckpointTopicProperties())
+    new KafkaCheckpointManager(checkpointSpec, checkpointSystemFactory, 
config.failOnCheckpointValidation, config,
+      new NoOpMetricsRegistry)
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index befd729..ff945da 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -60,8 +60,6 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
     brokerReads.put((host, port), newCounter("%s-%s-messages-read" format 
(host, port)))
     brokerSkippedFetchRequests.put((host, port), 
newCounter("%s-%s-skipped-fetch-requests" format (host, port)))
     topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format 
(host, port), 0))
-
-
   }
 
   // java friendlier interfaces

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
 
b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
new file mode 100644
index 0000000..b648b1c
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class TestKafkaCheckpointLogKeySerde {
+
+  @Test
+  public void testBinaryCompatibility() {
+    KafkaCheckpointLogKey logKey1 = new 
KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE,
+        new TaskName("Partition 0"), 
GroupByPartitionFactory.class.getCanonicalName());
+    KafkaCheckpointLogKeySerde checkpointSerde = new 
KafkaCheckpointLogKeySerde();
+
+    byte[] bytes = ("{\"systemstreampartition-grouper-factory\"" +
+        
":\"org.apache.samza.container.grouper.stream.GroupByPartitionFactory\",\"taskName\":\"Partition
 0\"," +
+        "\"type\":\"checkpoint\"}").getBytes();
+
+    // test that the checkpoints returned by the Serde are byte-wise identical 
to an actual checkpoint in Kafka
+    Assert.assertEquals(true, Arrays.equals(bytes, 
checkpointSerde.toBytes(logKey1)));
+  }
+
+  @Test
+  public void testSerde() {
+    KafkaCheckpointLogKey key = new 
KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE,
+        new TaskName("Partition 0"), 
GroupByPartitionFactory.class.getCanonicalName());
+    KafkaCheckpointLogKeySerde checkpointSerde = new 
KafkaCheckpointLogKeySerde();
+
+    // test that deserialize(serialize(k)) == k
+    Assert.assertEquals(key, 
checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
 
b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
new file mode 100644
index 0000000..a2ae94c
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import kafka.common.KafkaException;
+import kafka.common.TopicAlreadyMarkedForDeletionException;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.CheckpointSerde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamMetadata;
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kafka.KafkaStreamSpec;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.*;
+
+public class TestKafkaCheckpointManagerJava {
+  private static final TaskName TASK1 = new TaskName("task1");
+  private static final String CHECKPOINT_TOPIC = "topic-1";
+  private static final String CHECKPOINT_SYSTEM = "system-1";
+  private static final Partition CHECKPOINT_PARTITION = new Partition(0);
+  private static final SystemStreamPartition CHECKPOINT_SSP =
+      new SystemStreamPartition(CHECKPOINT_SYSTEM, CHECKPOINT_TOPIC, 
CHECKPOINT_PARTITION);
+  private static final String GROUPER_FACTORY_CLASS = 
GroupByPartitionFactory.class.getCanonicalName();
+
+  @Test(expected = TopicAlreadyMarkedForDeletionException.class)
+  public void testStartFailsOnTopicCreationErrors() {
+
+    KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, 
CHECKPOINT_TOPIC,
+        CHECKPOINT_SYSTEM, 1);
+    // create an admin that throws an exception during createStream
+    SystemAdmin mockAdmin = newAdmin("0", "10");
+    doThrow(new TopicAlreadyMarkedForDeletionException("invalid 
stream")).when(mockAdmin).createStream(checkpointSpec);
+
+    SystemFactory factory = newFactory(mock(SystemProducer.class), 
mock(SystemConsumer.class), mockAdmin);
+    KafkaCheckpointManager checkpointManager = new 
KafkaCheckpointManager(checkpointSpec, factory,
+        true, mock(Config.class), mock(MetricsRegistry.class), null);
+
+    // expect an exception during startup
+    checkpointManager.start();
+  }
+
+  @Test(expected = StreamValidationException.class)
+  public void testStartFailsOnTopicValidationErrors() {
+
+    KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, 
CHECKPOINT_TOPIC,
+        CHECKPOINT_SYSTEM, 1);
+
+    // create an admin that throws an exception during validateStream
+    SystemAdmin mockAdmin = newAdmin("0", "10");
+    doThrow(new StreamValidationException("invalid 
stream")).when(mockAdmin).validateStream(checkpointSpec);
+
+    SystemFactory factory = newFactory(mock(SystemProducer.class), 
mock(SystemConsumer.class), mockAdmin);
+    KafkaCheckpointManager checkpointManager = new 
KafkaCheckpointManager(checkpointSpec, factory,
+        true, mock(Config.class), mock(MetricsRegistry.class), null);
+
+    // expect an exception during startup
+    checkpointManager.start();
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testReadFailsOnSerdeExceptions() throws Exception {
+    KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, 
CHECKPOINT_TOPIC,
+        CHECKPOINT_SYSTEM, 1);
+    Config mockConfig = mock(Config.class);
+    
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
+
+    // mock out a consumer that returns a single checkpoint IME
+    SystemStreamPartition ssp = new SystemStreamPartition("system-1", 
"input-topic", new Partition(0));
+    List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
+        ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
+    SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
+
+    SystemAdmin mockAdmin = newAdmin("0", "1");
+    SystemFactory factory = newFactory(mock(SystemProducer.class), 
mockConsumer, mockAdmin);
+
+    // wire up an exception throwing serde with the checkpointmanager
+    KafkaCheckpointManager checkpointManager = new 
KafkaCheckpointManager(checkpointSpec, factory,
+        true, mockConfig, mock(MetricsRegistry.class), new 
ExceptionThrowingCheckpointSerde());
+    checkpointManager.register(TASK1);
+    checkpointManager.start();
+
+    // expect an exception from ExceptionThrowingSerde
+    checkpointManager.readLastCheckpoint(TASK1);
+  }
+
+  @Test
+  public void testCheckpointsAreReadFromOldestOffset() throws Exception {
+    KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, 
CHECKPOINT_TOPIC,
+        CHECKPOINT_SYSTEM, 1);
+    Config mockConfig = mock(Config.class);
+    
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
+
+    // mock out a consumer that returns a single checkpoint IME
+    SystemStreamPartition ssp = new SystemStreamPartition("system-1", 
"input-topic", new Partition(0));
+    SystemConsumer mockConsumer = newConsumer(ImmutableList.of(
+        ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0"))));
+
+    String oldestOffset = "0";
+    SystemAdmin mockAdmin = newAdmin(oldestOffset, "1");
+    SystemFactory factory = newFactory(mock(SystemProducer.class), 
mockConsumer, mockAdmin);
+    KafkaCheckpointManager checkpointManager = new 
KafkaCheckpointManager(checkpointSpec, factory,
+        true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde());
+    checkpointManager.register(TASK1);
+
+    // 1. verify that consumer.register is called only during 
checkpointManager.start.
+    // 2. verify that consumer.register is called with the oldest offset.
+    // 3. verify that no other operation on the CheckpointManager re-invokes 
register since start offsets are set during
+    // register
+    verify(mockConsumer, times(0)).register(CHECKPOINT_SSP, oldestOffset);
+    checkpointManager.start();
+    verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);
+
+    checkpointManager.readLastCheckpoint(TASK1);
+    verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);
+  }
+
+  @Test
+  public void testAllMessagesInTheLogAreRead() throws Exception {
+    KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, 
CHECKPOINT_TOPIC,
+        CHECKPOINT_SYSTEM, 1);
+    Config mockConfig = mock(Config.class);
+    
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
+
+    SystemStreamPartition ssp = new SystemStreamPartition("system-1", 
"input-topic", new Partition(0));
+
+    int oldestOffset = 0;
+    int newestOffset = 10;
+
+    // mock out a consumer that returns ten checkpoint IMEs for the same ssp
+    List<List<IncomingMessageEnvelope>> pollOutputs = new ArrayList<>();
+    for(int offset = oldestOffset; offset <= newestOffset; offset++) {
+      pollOutputs.add(ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, 
Integer.toString(offset))));
+    }
+
+    // return one message at a time from each poll simulating a KafkaConsumer 
with max.poll.records = 1
+    SystemConsumer mockConsumer = newConsumer(pollOutputs);
+    SystemAdmin mockAdmin = newAdmin(Integer.toString(oldestOffset), 
Integer.toString(newestOffset));
+    SystemFactory factory = newFactory(mock(SystemProducer.class), 
mockConsumer, mockAdmin);
+
+    KafkaCheckpointManager checkpointManager = new 
KafkaCheckpointManager(checkpointSpec, factory,
+        true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde());
+    checkpointManager.register(TASK1);
+    checkpointManager.start();
+
+    // check that all ten messages are read, and the checkpoint is the newest 
message
+    Checkpoint checkpoint = checkpointManager.readLastCheckpoint(TASK1);
+    Assert.assertEquals(checkpoint.getOffsets(), ImmutableMap.of(ssp, 
Integer.toString(newestOffset)));
+  }
+
+  /**
+   * Create a new {@link SystemConsumer} that returns a list of messages 
sequentially at each subsequent poll.
+   *
+   * @param pollOutputs a list of poll outputs to be returned at subsequent 
polls.
+   *                    The i'th call to consumer.poll() will return the list 
at pollOutputs[i]
+   * @return the consumer
+   */
+  private SystemConsumer newConsumer(List<List<IncomingMessageEnvelope>> 
pollOutputs) throws Exception {
+    SystemConsumer mockConsumer = mock(SystemConsumer.class);
+    OngoingStubbing<Map> when = when(mockConsumer.poll(anySet(), anyLong()));
+    for (List<IncomingMessageEnvelope> pollOutput : pollOutputs) {
+      when = when.thenReturn(ImmutableMap.of(CHECKPOINT_SSP, pollOutput));
+    }
+    when.thenReturn(ImmutableMap.of());
+    return mockConsumer;
+  }
+
+  /**
+   * Create a new {@link SystemAdmin} that returns the provided oldest and 
newest offsets for its topics
+   */
+  private SystemAdmin newAdmin(String oldestOffset, String newestOffset) {
+    SystemStreamMetadata checkpointTopicMetadata = new 
SystemStreamMetadata(CHECKPOINT_TOPIC,
+        ImmutableMap.of(new Partition(0), new 
SystemStreamPartitionMetadata(oldestOffset,
+            newestOffset, Integer.toString(Integer.parseInt(newestOffset) + 
1))));
+    SystemAdmin mockAdmin = mock(SystemAdmin.class);
+    
when(mockAdmin.getSystemStreamMetadata(Collections.singleton(CHECKPOINT_TOPIC))).thenReturn(
+        ImmutableMap.of(CHECKPOINT_TOPIC, checkpointTopicMetadata));
+    return mockAdmin;
+  }
+
+  private SystemFactory newFactory(SystemProducer producer, SystemConsumer 
consumer, SystemAdmin admin) {
+    SystemFactory factory = mock(SystemFactory.class);
+    when(factory.getProducer(anyString(), any(Config.class), 
any(MetricsRegistry.class))).thenReturn(producer);
+    when(factory.getConsumer(anyString(), any(Config.class), 
any(MetricsRegistry.class))).thenReturn(consumer);
+    when(factory.getAdmin(anyString(), any(Config.class))).thenReturn(admin);
+    return factory;
+  }
+
+  /**
+   * Creates a new checkpoint envelope for the provided task, ssp and offset
+   */
+  private IncomingMessageEnvelope newCheckpointEnvelope(TaskName taskName, 
SystemStreamPartition ssp, String offset) {
+    KafkaCheckpointLogKey checkpointKey =
+        new KafkaCheckpointLogKey("checkpoint", taskName, 
GROUPER_FACTORY_CLASS);
+    KafkaCheckpointLogKeySerde checkpointKeySerde = new 
KafkaCheckpointLogKeySerde();
+
+    Checkpoint checkpointMsg = new Checkpoint(ImmutableMap.of(ssp, offset));
+    CheckpointSerde checkpointMsgSerde = new CheckpointSerde();
+
+    return new IncomingMessageEnvelope(CHECKPOINT_SSP, offset, 
checkpointKeySerde.toBytes(checkpointKey),
+        checkpointMsgSerde.toBytes(checkpointMsg));
+  }
+
+  private static class ExceptionThrowingCheckpointSerde extends 
CheckpointSerde {
+    public Checkpoint fromBytes(byte[] bytes) {
+      throw new KafkaException("exception");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
deleted file mode 100644
index 38f3dc2..0000000
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.checkpoint.kafka
-
-import org.apache.samza.SamzaException
-import org.apache.samza.container.TaskName
-import org.junit.Assert._
-import org.junit.{Before, Test}
-
-class TestKafkaCheckpointLogKey {
-  @Before
-  def setSSPGrouperFactoryString() {
-    KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("hello")
-  }
-
-  @Test
-  def checkpointKeySerializationRoundTrip() {
-    val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new 
TaskName("TN"))
-    val asBytes = checkpointKey.toBytes()
-    val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
-
-    assertEquals(checkpointKey, backFromBytes)
-    assertNotSame(checkpointKey, backFromBytes)
-  }
-
-  @Test
-  def differingSSPGrouperFactoriesCauseException() {
-
-    val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new 
TaskName("TN"))
-
-    val asBytes = checkpointKey.toBytes()
-
-    
KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("goodbye")
-
-    var gotException = false
-    try {
-      KafkaCheckpointLogKey.fromBytes(asBytes)
-    } catch {
-      case se:SamzaException => assertEquals(new 
DifferingSystemStreamPartitionGrouperFactoryValues("hello", 
"goodbye").getMessage(), se.getCause.getMessage)
-        gotException = true
-    }
-
-    assertTrue("Should have had an exception since ssp grouper factories 
didn't match", gotException)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/edce6b76/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index eba2033..dcf4068 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,325 +19,191 @@
 
 package org.apache.samza.checkpoint.kafka
 
-
 import java.util.Properties
 
-import _root_.kafka.admin.AdminUtils
-import _root_.kafka.common.{InvalidMessageSizeException, 
UnknownTopicOrPartitionException}
-import _root_.kafka.integration.KafkaServerTestHarness
-import _root_.kafka.message.InvalidMessageException
-import _root_.kafka.server.{ConfigType, KafkaConfig}
-import _root_.kafka.utils.{CoreUtils, TestUtils, ZkUtils}
-import org.apache.kafka.clients.producer.{KafkaProducer, Producer, 
ProducerConfig, ProducerRecord}
+import kafka.admin.AdminUtils
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.ConfigType
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import com.google.common.collect.ImmutableMap
+import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system._
-import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaStreamSpec}
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
KafkaUtilException, NoOpMetricsRegistry, TopicMetadataStore}
+import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
+import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util}
 import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
 import org.junit._
 
-import scala.collection.JavaConverters._
-import scala.collection._
-
 class TestKafkaCheckpointManager extends KafkaServerTestHarness {
 
   protected def numBrokers: Int = 3
 
-  def generateConfigs() = {
-    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
-    props.map(KafkaConfig.fromProps)
-  }
-
-  val checkpointTopic = "checkpoint-topic"
-  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
-  val checkpointTopicConfig = new org.apache.samza.config.KafkaConfig(new 
MapConfig()).getCheckpointTopicProperties()
-
-  val zkSecure = JaasUtils.isZkSecurityEnabled()
-
-  val partition = new Partition(0)
-  val partition2 = new Partition(1)
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "123").asJava)
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "12345").asJava)
-
-  var producerConfig: KafkaProducerConfig = null
+  val checkpointSystemName = "kafka"
+  val sspGrouperFactoryName = classOf[GroupByPartitionFactory].getCanonicalName
 
-  var metadataStore: TopicMetadataStore = null
-  var failOnTopicValidation = true
+  val ssp = new SystemStreamPartition("kafka", "topic", new Partition(0))
+  val checkpoint1 = new Checkpoint(ImmutableMap.of(ssp, "offset-1"))
+  val checkpoint2 = new Checkpoint(ImmutableMap.of(ssp, "offset-2"))
+  val taskName = new TaskName("Partition 0")
 
-  val systemStreamPartitionGrouperFactoryString = 
classOf[GroupByPartitionFactory].getCanonicalName
-
-  var systemConsumerFn: ()=>SystemConsumer = ()=>{null}
-  var systemProducerFn: ()=>SystemProducer = ()=>{null}
-  var systemAdminFn: ()=>SystemAdmin = ()=>{null}
-  
-  val systemName = "kafka"
-  val CHECKPOINT_STREAMID = "unused-temp-checkpoint-stream-id"
-  val kafkaStreamSpec = new KafkaStreamSpec(CHECKPOINT_STREAMID,
-                                 checkpointTopic, systemName, 1,
-                                 1, new Properties())
+  var brokers: String = null
+  var config: Config = null
 
   @Before
   override def setUp {
     super.setUp
-
     TestUtils.waitUntilTrue(() => 
servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache 
to update")
-
-    val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
-    val config = new java.util.HashMap[String, String]()
-    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
-    config.put("acks", "all")
-    config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
-    config.put(ProducerConfig.RETRIES_CONFIG, (new 
Integer(java.lang.Integer.MAX_VALUE-1)).toString)
-    
config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES.asJava)
-    producerConfig = new KafkaProducerConfig(systemName, "i001", config)
-
-    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
-
-    config.put(SystemConfig.SYSTEM_FACTORY format systemName, 
"org.apache.samza.system.kafka.KafkaSystemFactory")
-    config.put(org.apache.samza.config.KafkaConfig.CHECKPOINT_SYSTEM, 
systemName);
-    config.put(JobConfig.JOB_NAME, "some-job-name");
-    config.put(JobConfig.JOB_ID, "i001");
-    config.put("systems.%s.producer.%s" format (systemName, 
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), brokers)
-    config.put("systems.%s.consumer.zookeeper.connect" format systemName, 
zkConnect)
-    val cfg: SystemConfig = new SystemConfig(new MapConfig(config))
-    val (systemStreamName: String, systemConsumerFactory : SystemFactory) =
-      KafkaCheckpointManagerFactory.getCheckpointSystemStreamAndFactory(cfg)
-    systemConsumerFn = () => 
{systemConsumerFactory.getConsumer(systemStreamName, cfg, new 
NoOpMetricsRegistry())}
-    systemProducerFn = () => 
{systemConsumerFactory.getProducer(systemStreamName, cfg, new 
NoOpMetricsRegistry())}
-    systemAdminFn = () => {systemConsumerFactory.getAdmin(systemStreamName, 
cfg)}
-    
-  }
-
-  @After
-  override def tearDown() {
-    if (servers != null) {
-      servers.foreach(_.shutdown())
-      servers.foreach(server => CoreUtils.delete(server.config.logDirs))
-    }
-    super.tearDown
-  }
-
-  private def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, 
cpTopic: String = checkpointTopic) = {
-    val producer: Producer[Array[Byte], Array[Byte]] = new 
KafkaProducer(producerConfig.getProducerProperties)
-    val record = new ProducerRecord(
-      cpTopic,
-      0,
-      KafkaCheckpointLogKey.getCheckpointKey(taskName).toBytes(),
-      new CheckpointSerde().toBytes(checkpoint)
-    )
-    try {
-      producer.send(record).get()
-    } catch {
-      case e: Exception => println(e.getMessage)
-    } finally {
-      producer.close()
-    }
+    brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+    config = getConfig()
   }
 
-
-  private def createCheckpointTopic(cpTopic: String = checkpointTopic, 
partNum: Int = 1) = {
-    val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
-    try {
-      AdminUtils.createTopic(
-        zkClient,
-        cpTopic,
-        partNum,
-        1,
-        checkpointTopicConfig)
-    } catch {
-      case e: Exception => println(e.getMessage)
-    } finally {
-      zkClient.close
-    }
+  override def generateConfigs() = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
+    // do not use relative imports
+    props.map(_root_.kafka.server.KafkaConfig.fromProps)
   }
 
   @Test
   def 
testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite
 {
-    val kcm = getKafkaCheckpointManager
-    val taskName = new TaskName(partition.toString)
-    kcm.register(taskName)
-    createCheckpointTopic()
-    val systemAdmin = systemAdminFn()
-    systemAdmin.validateStream(kafkaStreamSpec)
-
-    // check that log compaction is enabled.
-    val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
+    val checkpointTopic = "checkpoint-topic-1"
+    val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+    kcm1.register(taskName)
+    kcm1.start
+    kcm1.stop
+    // check that start actually creates the topic with log compaction enabled
+    val zkClient = ZkUtils(zkConnect, 6000, 6000, 
JaasUtils.isZkSecurityEnabled())
     val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, 
checkpointTopic)
-    zkClient.close
+
+    assertEquals(topicConfig, new 
KafkaConfig(config).getCheckpointTopicProperties())
     assertEquals("compact", topicConfig.get("cleanup.policy"))
     assertEquals("26214400", topicConfig.get("segment.bytes"))
 
+    zkClient.close
+
     // read before topic exists should result in a null checkpoint
-    var readCp = kcm.readLastCheckpoint(taskName)
+    val readCp = readCheckpoint(checkpointTopic, taskName)
     assertNull(readCp)
 
-    // create topic the first time around
-    writeCheckpoint(taskName, cp1)
-    readCp = kcm.readLastCheckpoint(taskName)
-    assertEquals(cp1, readCp)
+    writeCheckpoint(checkpointTopic, taskName, checkpoint1)
+    assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
 
-    // should get an exception if partition doesn't exist
+    // writing a second message and reading it returns a more recent checkpoint
+    writeCheckpoint(checkpointTopic, taskName, checkpoint2)
+    assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
+  }
+
+  @Test
+  def testFailOnTopicValidation {
+    // By default, should fail if there is a topic validation error
+    val checkpointTopic = "eight-partition-topic";
+    val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+    kcm1.register(taskName)
+    // create topic with the wrong number of partitions
+    createTopic(checkpointTopic, 8, new 
KafkaConfig(config).getCheckpointTopicProperties())
     try {
-      readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
-      fail("Expected a SamzaException, since only one partition (partition 0) 
should exist.")
+      kcm1.start
+      fail("Expected an exception for invalid number of partitions in the 
checkpoint topic.")
     } catch {
-      case e: SamzaException => None // expected
-      case _: Exception => fail("Expected a SamzaException, since only one 
partition (partition 0) should exist.")
+      case e: StreamValidationException => None
     }
+    kcm1.stop
 
-    // writing a second message should work, too
-    writeCheckpoint(taskName, cp2)
-    readCp = kcm.readLastCheckpoint(taskName)
-    assertEquals(cp2, readCp)
-    kcm.stop
+    // Should not fail if failOnTopicValidation = false
+    val failOnTopicValidation = false
+    val kcm2 = createKafkaCheckpointManager(checkpointTopic, new 
CheckpointSerde, failOnTopicValidation)
+    kcm2.register(taskName)
+    try {
+      kcm2.start
+    } catch {
+      case e: KafkaUtilException => fail("Unexpected exception for invalid 
number of partitions in the checkpoint topic")
+    }
+    kcm2.stop
   }
 
+  @After
+  override def tearDown() {
+    if (servers != null) {
+      servers.foreach(_.shutdown())
+      servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    }
+    super.tearDown
+  }
 
-  @Test
-  def testCheckpointReadTwice {
-    val kcm = getKafkaCheckpointManager
-    val taskName = new TaskName(partition.toString)
-    kcm.register(taskName)
-    createCheckpointTopic()
-    val systemAdmin = systemAdminFn()
-    systemAdmin.validateStream(kafkaStreamSpec)
-
+  private def getCheckpointProducerProperties() : Properties = {
+    val defaultSerializer = classOf[ByteArraySerializer].getCanonicalName
+    val props = new Properties()
+    props.putAll(ImmutableMap.of(
+      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers,
+      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, defaultSerializer,
+      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, defaultSerializer))
+    props
+  }
 
-    // check that log compaction is enabled.
-    val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
-    val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, 
checkpointTopic)
-    zkClient.close
-    assertEquals("compact", topicConfig.get("cleanup.policy"))
-    assertEquals("26214400", topicConfig.get("segment.bytes"))
+  private def getConfig() : Config = {
+    new MapConfig(new ImmutableMap.Builder[String, String]()
+      .put(JobConfig.JOB_NAME, "some-job-name")
+      .put(JobConfig.JOB_ID, "i001")
+      .put(JobConfig.SSP_GROUPER_FACTORY, sspGrouperFactoryName)
+      .put(s"systems.$checkpointSystemName.samza.factory", 
classOf[KafkaSystemFactory].getCanonicalName)
+      .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", 
brokers)
+      .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", 
zkConnect)
+      .put("task.checkpoint.system", checkpointSystemName)
+      .build())
+  }
 
-    // read before topic exists should result in a null checkpoint
-    var readCp = kcm.readLastCheckpoint(taskName)
-    assertNull(readCp)
+  private def createKafkaCheckpointManager(cpTopic: String, serde: 
CheckpointSerde = new CheckpointSerde, failOnTopicValidation: Boolean = true) = 
{
+    val kafkaConfig = new org.apache.samza.config.KafkaConfig(config)
+    val props = kafkaConfig.getCheckpointTopicProperties()
+    val systemName = kafkaConfig.getCheckpointSystem.getOrElse(
+      throw new SamzaException("No system defined for Kafka's checkpoint 
manager."))
 
-    // create topic the first time around
-    writeCheckpoint(taskName, cp1)
-    readCp = kcm.readLastCheckpoint(taskName)
-    assertEquals(cp1, readCp)
+    val systemFactoryClassName = new SystemConfig(config)
+      .getSystemFactory(systemName)
+      .getOrElse(throw new SamzaException("Missing configuration: " + 
SystemConfig.SYSTEM_FACTORY format systemName))
 
-    // writing a second message should work, too
-    writeCheckpoint(taskName, cp2)
-    readCp = kcm.readLastCheckpoint(taskName)
-    assertEquals(cp2, readCp)
-    kcm.stop
+    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
 
-    // get new KCM for the same stream
-    val kcm1 = getKafkaCheckpointManager
-    kcm1.register(taskName)
-    readCp = kcm1.readLastCheckpoint(taskName)
-    assertEquals(cp2, readCp)
-    kcm1.stop
+    val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, 
props)
+    new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, 
config, new NoOpMetricsRegistry, serde)
   }
 
-  @Test
-  def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
-    val exceptions = List("InvalidMessageException", 
"InvalidMessageSizeException", "UnknownTopicOrPartitionException")
-    exceptions.foreach { exceptionName =>
-      val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
-      val taskName = new TaskName(partition.toString)
-      kcm.register(taskName)
-      createCheckpointTopic(serdeCheckpointTopic)
-      val systemAdmin = systemAdminFn()
-      systemAdmin.validateStream(kafkaStreamSpec)
-
-      writeCheckpoint(taskName, cp1, serdeCheckpointTopic)
-      // because serde will throw unrecoverable errors, it should result a 
KafkaCheckpointException
-      try {
-        kcm.readLastCheckpoint(taskName)
-        fail("Expected an Exception.")
-      } catch {
-        case e: KafkaUtilException => None
-        case e: Exception => None
-      }
-      kcm.stop
-    }
+  private def readCheckpoint(checkpointTopic: String, taskName: TaskName) : 
Checkpoint = {
+    val kcm = createKafkaCheckpointManager(checkpointTopic)
+    kcm.register(taskName)
+    kcm.start
+    val checkpoint = kcm.readLastCheckpoint(taskName)
+    kcm.stop
+    checkpoint
   }
 
-  @Test
-  def testFailOnTopicValidation {
-    // first case - default case, we should fail on validation
-    failOnTopicValidation = true
-    val checkpointTopic8 = checkpointTopic + "8";
-    val kcm = getKafkaCheckpointManagerWithParam(checkpointTopic8)
-    val taskName = new TaskName(partition.toString)
+  private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, 
checkpoint: Checkpoint) = {
+    val kcm = createKafkaCheckpointManager(checkpointTopic)
     kcm.register(taskName)
-    createCheckpointTopic(checkpointTopic8, 8) // create topic with the wrong 
number of partitions
-    try {
-      kcm.start
-      fail("Expected a KafkaUtilException for invalid number of partitions in 
the topic.")
-    }catch {
-      case e: StreamValidationException => None
-    }
-    kcm.stop
+    kcm.start
+    kcm.writeCheckpoint(taskName, checkpoint)
+  }
 
-    // same validation but ignore the validation error (pass 'false' to 
validate..)
-    failOnTopicValidation = false
-    val kcm1 = getKafkaCheckpointManagerWithParam((checkpointTopic8))
-    kcm1.register(taskName)
+  private def createTopic(cpTopic: String, partNum: Int, props: Properties) = {
+    val zkClient = ZkUtils(zkConnect, 6000, 6000, 
JaasUtils.isZkSecurityEnabled())
     try {
-      kcm1.start
-    }catch {
-      case e: KafkaUtilException => fail("Did not expect a KafkaUtilException 
for invalid number of partitions in the topic.")
+      AdminUtils.createTopic(
+        zkClient,
+        cpTopic,
+        partNum,
+        1,
+        props)
+    } catch {
+      case e: Exception => println(e.getMessage)
+    } finally {
+      zkClient.close
     }
-    kcm1.stop
   }
 
-  private def getKafkaCheckpointManagerWithParam(cpTopic: String) = new 
KafkaCheckpointManager(
-    clientId = "some-client-id",
-    checkpointTopic = cpTopic,
-    systemName = "kafka",
-    replicationFactor = 3,
-    socketTimeout = 30000,
-    bufferSize = 64 * 1024,
-    fetchSize = 300 * 1024,
-    getSystemConsumer = systemConsumerFn,
-    getSystemAdmin = systemAdminFn,
-    metadataStore = metadataStore,
-    getSystemProducer = systemProducerFn,
-    connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
-    systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
-    failOnCheckpointValidation = failOnTopicValidation,
-    checkpointTopicProperties = new org.apache.samza.config.KafkaConfig(new 
MapConfig()).getCheckpointTopicProperties())
-
-  // CheckpointManager with a specific checkpoint topic
-  private def getKafkaCheckpointManager = 
getKafkaCheckpointManagerWithParam(checkpointTopic)
-
-  // inject serde. Kafka exceptions will be thrown when serde.fromBytes is 
called
-  private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = 
new KafkaCheckpointManager(
-    clientId = "some-client-id-invalid-serde",
-    checkpointTopic = serdeCheckpointTopic,
-    systemName = "kafka",
-    replicationFactor = 3,
-    socketTimeout = 30000,
-    bufferSize = 64 * 1024,
-    fetchSize = 300 * 1024,
-    getSystemConsumer = systemConsumerFn,
-    getSystemAdmin = systemAdminFn,
-    metadataStore = metadataStore,
-    getSystemProducer = systemProducerFn,
-    connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
-    systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
-    failOnCheckpointValidation = failOnTopicValidation,
-    serde = new InvalideSerde(exception),
-    checkpointTopicProperties = new org.apache.samza.config.KafkaConfig(new 
MapConfig()).getCheckpointTopicProperties())
-
-  class InvalideSerde(exception: String) extends CheckpointSerde {
-    override def fromBytes(bytes: Array[Byte]): Checkpoint = {
-      exception match {
-        case "InvalidMessageException" => throw new InvalidMessageException
-        case "InvalidMessageSizeException" => throw new 
InvalidMessageSizeException
-        case "UnknownTopicOrPartitionException" => throw new 
UnknownTopicOrPartitionException
-      }
-    }
-  }
 }

Reply via email to