http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
new file mode 100644
index 0000000..223862f
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.systemstreampartition.groupers
+
+import org.apache.samza.container.{TaskName, 
SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper}
+import java.util
+import org.apache.samza.system.SystemStreamPartition
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+
+/**
+ * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their 
Partition, with the key being
+ * the string representation of the Partition.
+ */
+class GroupByPartition extends SystemStreamPartitionGrouper {
+  override def group(ssps: util.Set[SystemStreamPartition]) = {
+    ssps.groupBy( s => new TaskName("Partition " + 
s.getPartition.getPartitionId) )
+      .map(r => r._1 -> r._2.asJava)
+  }
+}
+
+class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory {
+  override def getSystemStreamPartitionGrouper(config: Config): 
SystemStreamPartitionGrouper = new GroupByPartition
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
new file mode 100644
index 0000000..a2bcfee
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.systemstreampartition.groupers
+
+import org.apache.samza.container.{TaskName, 
SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper}
+import java.util
+import org.apache.samza.system.SystemStreamPartition
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+
+/**
+ * Group the {@link org.apache.samza.system.SystemStreamPartition}s by 
themselves, effectively putting each
+ * SystemStreamPartition into its own group, with the key being the string 
representation of the SystemStringPartition
+ */
+class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper {
+  override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ 
s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava)
+}
+
+class GroupBySystemStreamPartitionFactory extends 
SystemStreamPartitionGrouperFactory {
+  override def getSystemStreamPartitionGrouper(config: Config): 
SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
new file mode 100644
index 0000000..7913294
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.systemstreampartition.taskname.groupers
+
+import org.apache.samza.container.{TaskName, 
SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.system.SystemStreamPartition
+
+/**
+ * Group the SSP taskNames by dividing the number of taskNames into the number 
of containers (n) and assigning n taskNames
+ * to each container as returned by iterating over the keys in the map of 
taskNames (whatever that ordering happens to be).
+ * No consideration is given towards locality, even distribution of aggregate 
SSPs within a container, even distribution
+ * of the number of taskNames between containers, etc.
+ */
+class SimpleSystemStreamPartitionTaskNameGrouper(numContainers:Int) extends 
SystemStreamPartitionTaskNameGrouper {
+  require(numContainers > 0, "Must have at least one container")
+
+  override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): 
Map[Int, TaskNamesToSystemStreamPartitions] = {
+    val keySize = taskNames.keySet.size
+    require(keySize > 0, "Must have some SSPs to group, but found none")
+
+    // Iterate through the taskNames, round-robining them per container
+    val byContainerNum = (0 until numContainers).map(_ -> 
scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap
+    var idx = 0
+    for(taskName <- taskNames.iterator) {
+      val currMap = byContainerNum.get(idx).get // safe to use simple get 
since we populated everybody above
+      idx = (idx + 1) % numContainers
+
+      currMap += taskName
+    }
+
+    byContainerNum.map(kv => kv._1 -> 
TaskNamesToSystemStreamPartitions(kv._2)).toMap
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
index 4635bb2..8c5533c 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -19,17 +19,76 @@
 
 package org.apache.samza.job
 
-import scala.collection.JavaConversions._
+import java.util
+import org.apache.samza.Partition
 import org.apache.samza.config.ShellCommandConfig
 import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand
 import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.container.TaskName
+import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.Util
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+import scala.collection.JavaConversions._
+import scala.reflect.BeanProperty
+
+object ShellCommandBuilder {
+  /**
+   * Jackson really hates Scala's classes, so we need to wrap up the SSP in a 
form Jackson will take
+   */
+  private class SSPWrapper(@BeanProperty var partition:java.lang.Integer = 
null,
+                           @BeanProperty var Stream:java.lang.String = null,
+                           @BeanProperty var System:java.lang.String = null) {
+    def this() { this(null, null, null) }
+    def this(ssp:SystemStreamPartition) { 
this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, 
ssp.getSystemStream.getSystem)}
+  }
+
+  def serializeSystemStreamPartitionSetToJSON(sspTaskNames: 
java.util.Map[TaskName,java.util.Set[SystemStreamPartition]]): String = {
+    val map = new util.HashMap[TaskName, util.ArrayList[SSPWrapper]]()
+    for((key, ssps) <- sspTaskNames) {
+      val al = new util.ArrayList[SSPWrapper](ssps.size)
+      for(ssp <- ssps) { al.add(new SSPWrapper(ssp)) }
+      map.put(key, al)
+    }
+
+    new ObjectMapper().writeValueAsString(map)
+  }
+
+  def deserializeSystemStreamPartitionSetFromJSON(sspsAsJSON: String): 
Map[TaskName, Set[SystemStreamPartition]] = {
+    val om = new ObjectMapper()
+
+    val asWrapper = om.readValue(sspsAsJSON, new 
TypeReference[util.HashMap[String, util.ArrayList[SSPWrapper]]]() { 
}).asInstanceOf[util.HashMap[String, util.ArrayList[SSPWrapper]]]
+
+    val taskName = for((key, sspsWrappers) <- asWrapper;
+                       taskName = new TaskName(key);
+                       ssps = sspsWrappers.map(w => new 
SystemStreamPartition(w.getSystem, w.getStream, new 
Partition(w.getPartition))).toSet
+    ) yield(taskName -> ssps)
 
+    taskName.toMap // to get an immutable map rather than mutable...
+  }
+
+  def serializeTaskNameToChangeLogPartitionMapping(mapping:Map[TaskName, Int]) 
= {
+    val javaMap = new util.HashMap[TaskName, java.lang.Integer]()
+
+    mapping.foreach(kv => javaMap.put(kv._1, Integer.valueOf(kv._2)))
+
+    new ObjectMapper().writeValueAsString(javaMap)
+  }
+
+  def deserializeTaskNameToChangeLogPartitionMapping(taskNamesAsJSON: String): 
Map[TaskName, Int] = {
+    val om = new ObjectMapper()
+
+    val asMap = om.readValue(taskNamesAsJSON, new 
TypeReference[util.HashMap[String, java.lang.Integer]] 
{}).asInstanceOf[util.HashMap[String, java.lang.Integer]]
+
+    asMap.map(kv => new TaskName(kv._1) -> kv._2.intValue()).toMap
+  }
+}
 class ShellCommandBuilder extends CommandBuilder {
   def buildCommand() = config.getCommand
 
   def buildEnvironment(): java.util.Map[String, String] = {
-    var streamsAndPartsString = 
Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set 
conversion
+    var streamsAndPartsString = 
ShellCommandBuilder.serializeSystemStreamPartitionSetToJSON(taskNameToSystemStreamPartitionsMapping)
 // Java to Scala set conversion
+    var taskNameToChangeLogPartitionMappingString = 
ShellCommandBuilder.serializeTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMapping.map(kv
 => kv._1 -> kv._2.toInt).toMap)
     var envConfig = JsonConfigSerializer.toJson(config)
     val isCompressed = if(config.isEnvConfigCompressed) "TRUE" else "FALSE"
 
@@ -40,14 +99,17 @@ class ShellCommandBuilder extends CommandBuilder {
        * of the environment variable(s) exported while starting a Samza 
container (SAMZA-337)
        */
       streamsAndPartsString = Util.compress(streamsAndPartsString)
+      taskNameToChangeLogPartitionMappingString = 
Util.compress(taskNameToChangeLogPartitionMappingString)
       envConfig = Util.compress(envConfig)
     }
 
     Map(
       ShellCommandConfig.ENV_CONTAINER_NAME -> name,
       ShellCommandConfig.ENV_SYSTEM_STREAMS -> streamsAndPartsString,
+      ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING -> 
taskNameToChangeLogPartitionMappingString,
       ShellCommandConfig.ENV_CONFIG -> envConfig,
       ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""),
       ShellCommandConfig.ENV_COMPRESS_CONFIG -> isCompressed)
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
index e20e7c1..713bded 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala
@@ -24,24 +24,32 @@ import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.job.CommandBuilder
 import org.apache.samza.job.StreamJob
 import org.apache.samza.job.StreamJobFactory
-import scala.collection.JavaConversions._
 import grizzled.slf4j.Logging
 import org.apache.samza.SamzaException
-import org.apache.samza.container.SamzaContainer
+import org.apache.samza.container.{TaskNamesToSystemStreamPartitions, 
SamzaContainer}
 import org.apache.samza.util.Util
 import org.apache.samza.job.ShellCommandBuilder
+import scala.collection.JavaConversions._
 
 class LocalJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
-    val taskName = "local-task"
-    val partitions = Util.getInputStreamPartitions(config)
+    val jobName = "local-container"
 
-    info("got partitions for job %s" format partitions)
+    // Since we're local, there will only be a single task into which all the 
SSPs will be processed
+    val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = 
Util.assignContainerToSSPTaskNames(config, 1)
+    if(taskToTaskNames.size != 1) {
+      throw new SamzaException("Should only have a single task count but 
somehow got more " + taskToTaskNames.size)
+    }
 
-    if (partitions.size <= 0) {
-      throw new SamzaException("No partitions were detected for your input 
streams. It's likely that the system(s) specified don't know about the input 
streams: %s" format config.getInputStreams)
+    // So pull out that single TaskNamesToSystemStreamPartitions
+    val sspTaskName: TaskNamesToSystemStreamPartitions = 
taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task 
number for the SSPs but somehow do not: " + taskToTaskNames))
+    if (sspTaskName.size <= 0) {
+      throw new SamzaException("No SystemStreamPartitions to process were 
detected for your input streams. It's likely that the system(s) specified don't 
know about the input streams: %s" format config.getInputStreams)
     }
 
+    val taskNameToChangeLogPartitionMapping = 
Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames).map(kv => 
kv._1 -> Integer.valueOf(kv._2))
+    info("got taskName for job %s" format sspTaskName)
+
     config.getCommandClass match {
       case Some(cmdBuilderClassName) => {
         // A command class was specified, so we need to use a process job to
@@ -50,8 +58,8 @@ class LocalJobFactory extends StreamJobFactory with Logging {
         
         cmdBuilder
           .setConfig(config)
-          .setName(taskName)
-          .setStreamPartitions(partitions)
+          .setName(jobName)
+          
.setTaskNameToSystemStreamPartitionsMapping(sspTaskName.getJavaFriendlyType)
 
         val processBuilder = new 
ProcessBuilder(cmdBuilder.buildCommand.split(" ").toList)
 
@@ -72,7 +80,7 @@ class LocalJobFactory extends StreamJobFactory with Logging {
 
         // No command class was specified, so execute the job in this process
         // using a threaded job.
-        new ThreadJob(SamzaContainer(taskName, partitions, config))
+        new ThreadJob(SamzaContainer(jobName, sspTaskName, 
taskNameToChangeLogPartitionMapping, config))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 
b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
index 3d0a484..34c846c 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
@@ -19,26 +19,59 @@
 
 package org.apache.samza.serializers
 
-import scala.collection.JavaConversions._
-import org.codehaus.jackson.map.ObjectMapper
-import org.apache.samza.system.SystemStream
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.SamzaException
 import grizzled.slf4j.Logging
+import java.util
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.container.TaskName
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.{SamzaException, Partition}
+import org.codehaus.jackson.map.ObjectMapper
+import scala.collection.JavaConversions._
+import org.codehaus.jackson.`type`.TypeReference
 
+/**
+ * Write out the Checkpoint object in JSON.  The underlying map of SSP => 
Offset cannot be stored directly because
+ * JSON only allows strings as map types, so we would need to separately 
serialize the SSP to a string that doesn't
+ * then interfere with JSON's decoding of the overall map.  We'll sidestep the 
whole issue by turning the
+ * map into a list[String] of (System, Stream, Partition, Offset) serializing 
that.
+ */
 class CheckpointSerde extends Serde[Checkpoint] with Logging {
+  import CheckpointSerde._
+  // TODO: Elucidate the CheckpointSerde relationshiop to Serde. Should Serde 
also have keyTo/FromBytes? Should
+  // we just take CheckpointSerde here as interface and have this be 
JSONCheckpointSerde?
+  // TODO: Add more tests.  This class currently only has direct test and is 
mainly tested by the other checkpoint managers
   val jsonMapper = new ObjectMapper()
 
+  // Jackson absolutely hates Scala types and hidden conversions hate you, so 
we're going to be very, very
+  // explicit about the Java (not Scala) types used here and never let Scala 
get its grubby little hands
+  // on any instance.
+
+  // Store checkpoint as maps keyed of the SSP.toString to the another map of 
the constituent SSP components
+  // and offset.  Jackson can't automatically serialize the SSP since it's not 
a POJO and this avoids
+  // having to wrap it another class while maintaing readability.
+
   def fromBytes(bytes: Array[Byte]): Checkpoint = {
     try {
-      val checkpointMap = jsonMapper
-        .readValue(bytes, classOf[java.util.Map[String, java.util.Map[String, 
String]]])
-        .flatMap {
-          case (systemName, streamToOffsetMap) =>
-            streamToOffsetMap.map { case (streamName, offset) => (new 
SystemStream(systemName, streamName), offset) }
-        }
-      return new Checkpoint(checkpointMap)
-    } catch {
+      val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, 
util.HashMap[String, String]]])
+
+      def deserializeJSONMap(m:util.HashMap[String, String]) = {
+        require(m.size() == 4, "All JSON-encoded SystemStreamPartitions must 
have four keys")
+        val system = m.get("system")
+        require(system != null, "System must be present in JSON-encoded 
SystemStreamPartition")
+        val stream = m.get("stream")
+        require(stream != null, "Stream must be present in JSON-encoded 
SystemStreamPartition")
+        val partition = m.get("partition")
+        require(partition != null, "Partition must be present in JSON-encoded 
SystemStreamPartition")
+        val offset = m.get("offset")
+        require(offset != null, "Offset must be present in JSON-encoded 
SystemStreamPartition")
+
+        new SystemStreamPartition(system, stream, new 
Partition(partition.toInt)) -> offset
+      }
+
+      val cpMap = jMap.values.map(deserializeJSONMap).toMap
+
+      return new Checkpoint(cpMap)
+    }catch {
       case e : Exception =>
         warn("Exception while deserializing checkpoint: " + e)
         debug("Exception detail:", e)
@@ -46,34 +79,38 @@ class CheckpointSerde extends Serde[Checkpoint] with 
Logging {
     }
   }
 
-  def toBytes(checkpoint: Checkpoint) = {
-    val offsetMap = mapAsJavaMap(checkpoint
-      .getOffsets
-      // Convert Map[SystemStream, String] offset map to a iterable of tuples 
(system, stream, offset)
-      .map { case (systemStream, offset) => (systemStream.getSystem, 
systemStream.getStream, offset) }
-      // Group into a Map[String, (String, String, String)] by system
-      .groupBy(_._1)
-      // Group the tuples for each system into a Map[String, String] for 
stream to offsets
-      .map {
-        case (systemName, tuples) =>
-          val streamToOffestMap = mapAsJavaMap(tuples
-            // Group the tuples by stream name
-            .groupBy(_._2)
-            // There should only ever be one SystemStream to offset mapping, 
so just 
-            // grab the first element from the tuple list for each stream.
-            .map {
-              case (streamName, tuples) => {
-                // If there's more than one offset, something is seriously 
wrong.
-                if (tuples.size != 1) {
-                  throw new SamzaException("Got %s offsets for %s. Expected 
only one offset, so failing." format (tuples.size, streamName))
-                }
-                (streamName, tuples.head._3)
-              }
-            }
-            .toMap)
-          (systemName, streamToOffestMap)
-      }.toMap)
-
-    jsonMapper.writeValueAsBytes(offsetMap)
+  def toBytes(checkpoint: Checkpoint): Array[Byte] = {
+    val offsets = checkpoint.getOffsets
+    val asMap = new util.HashMap[String, util.HashMap[String, 
String]](offsets.size())
+
+    offsets.foreach {
+      case (ssp, offset) =>
+        val jMap = new util.HashMap[String, String](4)
+        jMap.put("system", ssp.getSystemStream.getSystem)
+        jMap.put("stream", ssp.getSystemStream.getStream)
+        jMap.put("partition", ssp.getPartition.getPartitionId.toString)
+        jMap.put("offset", offset)
+
+        asMap.put(ssp.toString, jMap)
+    }
+
+    jsonMapper.writeValueAsBytes(asMap)
+  }
+
+  def changelogPartitionMappingFromBytes(bytes: Array[Byte]): 
util.Map[TaskName, java.lang.Integer] = {
+    try {
+      jsonMapper.readValue(bytes, PARTITION_MAPPING_TYPEREFERENCE)
+    } catch {
+      case e : Exception =>
+        throw new SamzaException("Exception while deserializing changelog 
partition mapping", e)
+    }
   }
+
+  def changelogPartitionMappingToBytes(mapping: util.Map[TaskName, 
java.lang.Integer]) = {
+    jsonMapper.writeValueAsBytes(new util.HashMap[TaskName, 
java.lang.Integer](mapping))
+  }
+}
+
+object CheckpointSerde {
+  val PARTITION_MAPPING_TYPEREFERENCE = new 
TypeReference[util.HashMap[TaskName, java.lang.Integer]]() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 7214151..0cfdbb3 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -29,14 +29,16 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.SystemStreamPartitionIterator
 import org.apache.samza.util.Util
 import org.apache.samza.SamzaException
+import org.apache.samza.container.TaskName
 
 object TaskStorageManager {
   def getStoreDir(storeBaseDir: File, storeName: String) = {
     new File(storeBaseDir, storeName)
   }
 
-  def getStorePartitionDir(storeBaseDir: File, storeName: String, partition: 
Partition) = {
-    new File(storeBaseDir, storeName + File.separator + 
partition.getPartitionId)
+  def getStorePartitionDir(storeBaseDir: File, storeName: String, taskName: 
TaskName) = {
+    // TODO: Sanitize, check and clean taskName string as a valid value for a 
file
+    new File(storeBaseDir, storeName + File.separator + taskName)
   }
 }
 
@@ -44,12 +46,13 @@ object TaskStorageManager {
  * Manage all the storage engines for a given task
  */
 class TaskStorageManager(
-  partition: Partition,
+  taskName: TaskName,
   taskStores: Map[String, StorageEngine] = Map(),
   storeConsumers: Map[String, SystemConsumer] = Map(),
   changeLogSystemStreams: Map[String, SystemStream] = Map(),
   changeLogOldestOffsets: Map[SystemStream, String] = Map(),
-  storeBaseDir: File = new File(System.getProperty("user.dir"), "state")) 
extends Logging {
+  storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
+  partition: Partition) extends Logging {
 
   var taskStoresToRestore = taskStores
 
@@ -66,7 +69,7 @@ class TaskStorageManager(
     debug("Cleaning base directories for stores.")
 
     taskStores.keys.foreach(storeName => {
-      val storagePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, partition)
+      val storagePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
 
       debug("Cleaning %s for store %s." format (storagePartitionDir, 
storeName))
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
index 4ccd604..6e1134d 100644
--- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala
@@ -20,12 +20,12 @@
 package org.apache.samza.task
 
 import org.apache.samza.task.TaskCoordinator.RequestScope
-import org.apache.samza.Partition
+import org.apache.samza.container.TaskName
 
 /**
  * An in-memory implementation of TaskCoordinator that is specific to a single 
TaskInstance.
  */
-class ReadableCoordinator(val partition: Partition) extends TaskCoordinator {
+class ReadableCoordinator(val taskName: TaskName) extends TaskCoordinator {
   var commitRequest: Option[RequestScope] = None
   var shutdownRequest: Option[RequestScope] = None
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 60d96c9..32c2647 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,23 +19,26 @@
 
 package org.apache.samza.util
 
+import grizzled.slf4j.Logging
 import java.io._
 import java.lang.management.ManagementFactory
+import java.util
 import java.util.Random
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import grizzled.slf4j.Logging
 import org.apache.commons.codec.binary.Base64
-import org.apache.samza.{ Partition, SamzaException }
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManagerFactory
 import org.apache.samza.config.Config
+import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import scala.collection.JavaConversions._
+import 
org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
+import 
org.apache.samza.container.systemstreampartition.taskname.groupers.SimpleSystemStreamPartitionTaskNameGrouper
+import org.apache.samza.container.{TaskName, 
SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions, 
SystemStreamPartitionGrouperFactory}
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.{SystemStreamPartition, SystemFactory, 
StreamMetadataCache, SystemStream}
-import org.codehaus.jackson.map.ObjectMapper
-import org.codehaus.jackson.`type`.TypeReference
-import java.util
-import scala.reflect.BeanProperty
-
+import scala.collection.JavaConversions._
+import scala.collection
 
 object Util extends Logging {
   val random = new Random
@@ -82,7 +85,7 @@ object Util extends Logging {
 
   /**
    * For each input stream specified in config, exactly determine its 
partitions, returning a set of SystemStreamPartitions
-   * corresponding to them all
+   * containing them all
    *
    * @param config Source of truth for systems and inputStreams
    * @return Set of SystemStreamPartitions, one for each unique system, stream 
and partition
@@ -112,6 +115,50 @@ object Util extends Logging {
   }
 
   /**
+   * Assign mapping of which TaskNames go to which container
+   *
+   * @param config For factories for Grouper and TaskNameGrouper
+   * @param containerCount How many tasks are we we working with
+   * @return Map of int (taskId) to SSPTaskNameMap that taskID is responsible 
for
+   */
+  def assignContainerToSSPTaskNames(config:Config, containerCount:Int): 
Map[Int, TaskNamesToSystemStreamPartitions] = {
+    import org.apache.samza.config.JobConfig.Config2Job
+
+    val allSystemStreamPartitions: Set[SystemStreamPartition] = 
Util.getInputStreamPartitions(config)
+
+    val sspTaskNamesAsJava: util.Map[TaskName, 
util.Set[SystemStreamPartition]] = {
+      val factoryString = config.getSystemStreamPartitionGrouperFactory
+
+      info("Instantiating type " + factoryString + " to build 
SystemStreamPartition groupings")
+
+      val factory = 
Util.getObj[SystemStreamPartitionGrouperFactory](factoryString)
+
+      val grouper = factory.getSystemStreamPartitionGrouper(config)
+
+      val groups = grouper.group(allSystemStreamPartitions)
+
+      info("SystemStreamPartitionGrouper " + grouper + " has grouped the 
SystemStreamPartitions into the following taskNames:")
+      groups.foreach(g => info("TaskName: " + g._1 + " => " + g._2))
+
+      groups
+    }
+
+    val sspTaskNames = TaskNamesToSystemStreamPartitions(sspTaskNamesAsJava)
+
+    info("Assigning " + sspTaskNames.keySet.size + " SystemStreamPartitions 
taskNames to " + containerCount + " containers.")
+
+    // Here is where we should put in a pluggable option for the 
SSPTaskNameGrouper for locality, load-balancing, etc.
+    val sspTaskNameGrouper = new 
SimpleSystemStreamPartitionTaskNameGrouper(containerCount)
+
+    val containersToTaskNames = 
sspTaskNameGrouper.groupTaskNames(sspTaskNames).toMap
+
+    info("Grouped SystemStreamPartition TaskNames (size = " + 
containersToTaskNames.size + "): ")
+    containersToTaskNames.foreach(t => info("Container number: " + t._1 + " => 
" + t._2))
+
+    containersToTaskNames
+  }
+
+  /**
    * Returns a SystemStream object based on the system stream name given. For
    * example, kafka.topic would return new SystemStream("kafka", "topic").
    */
@@ -131,40 +178,120 @@ object Util extends Logging {
   }
 
   /**
-   * For specified containerId, create a list of of the streams and partitions 
that task should handle,
-   * based on the number of tasks in the job
+   * Using previous taskName to partition mapping and current taskNames for 
this job run, create a new mapping that preserves
+   * the previous order and deterministically assigns any new taskNames to 
changelog partitions.  Be chatty about new or
+   * missing taskNames.
    *
-   * @param containerId TaskID to determine work for
-   * @param containerCount Total number of tasks in the job
-   * @param ssp All SystemStreamPartitions
-   * @return Collection of streams and partitions for this particular 
containerId
+   * @param currentTaskNames All the taskNames the current job is processing
+   * @param previousTaskNameMapping Previous mapping of taskNames to partition
+   * @return New mapping of taskNames to partitions for the changelog
    */
-  def getStreamsAndPartitionsForContainer(containerId: Int, containerCount: 
Int, ssp: Set[SystemStreamPartition]): Set[SystemStreamPartition] = {
-    ssp.filter(_.getPartition.getPartitionId % containerCount == containerId)
+  def 
resolveTaskNameToChangelogPartitionMapping(currentTaskNames:Set[TaskName],
+    previousTaskNameMapping:Map[TaskName, Int]): Map[TaskName, Int] = {
+    info("Previous mapping of taskNames to partition: " + 
previousTaskNameMapping.toList.sorted)
+    info("Current set of taskNames: " + currentTaskNames.toList.sorted)
+
+    val previousTaskNames: Set[TaskName] = previousTaskNameMapping.keySet
+
+    if(previousTaskNames.equals(currentTaskNames)) {
+      info("No change in TaskName sets from previous run. Returning previous 
mapping.")
+      return previousTaskNameMapping
+    }
+
+    if(previousTaskNames.isEmpty) {
+      warn("No previous taskName mapping defined.  This is OK if it's the 
first time the job is being run, otherwise data may have been lost.")
+    }
+
+    val missingTaskNames = previousTaskNames -- currentTaskNames
+
+    if(missingTaskNames.isEmpty) {
+      info("No taskNames are missing between this run and previous")
+    } else {
+      warn("The following taskNames were previously defined and are no longer 
present: " + missingTaskNames)
+    }
+
+    val newTaskNames = currentTaskNames -- previousTaskNames
+
+    if(newTaskNames.isEmpty) {
+      info("No new taskNames have been added between this run and the 
previous")
+      previousTaskNameMapping // Return the old mapping since there are no new 
taskNames for which to account
+
+    } else {
+      warn("The following new taskNames have been added in this job run: " + 
newTaskNames)
+
+      // Sort the new taskNames and assign them partitions (starting at 0 for 
now)
+      val sortedNewTaskNames = newTaskNames.toList.sortWith { (a,b) => 
a.getTaskName < b.getTaskName }.zipWithIndex.toMap
+
+      // Find next largest partition to use based on previous mapping
+      val nextPartitionToUse = if(previousTaskNameMapping.size == 0) 0
+                               else previousTaskNameMapping.foldLeft(0)((a,b) 
=> math.max(a, b._2)) + 1
+
+      // Bump up the partition values
+      val newTaskNamesWithTheirPartitions = sortedNewTaskNames.map(c => c._1 
-> (c._2 + nextPartitionToUse))
+      
+      // Merge old and new
+      val newMapping = previousTaskNameMapping ++ 
newTaskNamesWithTheirPartitions
+      
+      info("New taskName to partition mapping: " + newMapping.toList.sortWith{ 
(a,b) => a._2 < b._2})
+      
+      newMapping
+    }
   }
 
   /**
-   * Jackson really hates Scala's classes, so we need to wrap up the SSP in a 
form Jackson will take
+   * Read the TaskName to changelog partition mapping from the checkpoint 
manager, if one exists.
+   *
+   * @param config To pull out values for instantiating checkpoint manager
+   * @param tasksToSSPTaskNames Current TaskNames for the current job run
+   * @return Current mapping of TaskNames to changelog partitions
    */
-  private class SSPWrapper(@BeanProperty var partition:java.lang.Integer = 
null,
-                           @BeanProperty var Stream:java.lang.String = null,
-                           @BeanProperty var System:java.lang.String = null) {
-    def this() { this(null, null, null) }
-    def this(ssp:SystemStreamPartition) { 
this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, 
ssp.getSystemStream.getSystem)}
-  }
+  def getTaskNameToChangeLogPartitionMapping(config: Config, 
tasksToSSPTaskNames: Map[Int, TaskNamesToSystemStreamPartitions]) = {
+    val taskNameMaps: Set[TaskNamesToSystemStreamPartitions] = 
tasksToSSPTaskNames.map(_._2).toSet
+    val currentTaskNames: Set[TaskName] = 
taskNameMaps.map(_.keys).toSet.flatten
+
+    // We need to oh so quickly instantiate a checkpoint manager and grab the 
partition mapping from the log, then toss the manager aside
+    val checkpointManager = config.getCheckpointManagerFactory match {
+      case Some(checkpointFactoryClassName) =>
+        Util
+          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
+          .getCheckpointManager(config, new MetricsRegistryMap)
+      case _ => null
+    }
 
-  def serializeSSPSetToJSON(ssps: Set[SystemStreamPartition]): String = {
-    val al = new util.ArrayList[SSPWrapper](ssps.size)
-    for(ssp <- ssps) { al.add(new SSPWrapper(ssp)) }
+    if(checkpointManager == null) {
+      // Check if we have a changelog configured, which requires a checkpoint 
manager
 
-    new ObjectMapper().writeValueAsString(al)
-  }
+      if(!config.getStoreNames.isEmpty) {
+        throw new SamzaException("Storage factories configured, but no 
checkpoint manager has been specified.  " +
+          "Unable to start job as there would be no place to store changelog 
partition mapping.")
+      }
+      // No need to do any mapping, just use what has been provided
+      Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, 
Map[TaskName, Int]())
+    } else {
+
+      info("Got checkpoint manager: %s" format checkpointManager)
+
+      // Always put in a call to create so the log is available for the tasks 
on startup.
+      // Reasonably lame to hide it in here.  TODO: Pull out to more visible 
location.
+      checkpointManager.start
 
-  def deserializeSSPSetFromJSON(ssp: String) = {
-    val om = new ObjectMapper()
+      val previousMapping: Map[TaskName, Int] = {
+        val fromCM = checkpointManager.readChangeLogPartitionMapping()
 
-    val asWrapper = om.readValue(ssp, new 
TypeReference[util.ArrayList[SSPWrapper]]() { 
}).asInstanceOf[util.ArrayList[SSPWrapper]]
-    asWrapper.map(w => new SystemStreamPartition(w.getSystem, w.getStream(), 
new Partition(w.getPartition()))).toSet
+        fromCM.map(kv => kv._1 -> kv._2.intValue()).toMap // Java to Scala 
interop!!!
+      }
+
+      checkpointManager.stop
+
+      val newMapping = 
Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, 
previousMapping)
+
+      if (newMapping != null) {
+        info("Writing new changelog partition mapping to checkpoint manager.")
+        checkpointManager.writeChangeLogPartitionMapping(newMapping.map(kv => 
kv._1 -> new java.lang.Integer(kv._2))) //Java to Scala interop!!!
+      }
+
+      newMapping
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index bc54f9e..1eb3995 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -19,18 +19,19 @@
 
 package org.apache.samza.checkpoint
 
+import org.apache.samza.Partition
+import 
org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, 
MockSystemFactory}
+import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
+import org.apache.samza.metrics.MetricsRegistry
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, 
SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition}
 import org.junit.{Before, Test}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mock.MockitoSugar
 import scala.collection.JavaConversions._
-import org.apache.samza.Partition
-import 
org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, 
MockSystemFactory}
-import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, 
SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition}
-import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.container.TaskName
 
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = null
@@ -52,6 +53,11 @@ object TestCheckpointTool {
 class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
   var config: MapConfig = null
 
+  val tn0 = new TaskName("Partition 0")
+  val tn1 = new TaskName("Partition 1")
+  val p0 = new Partition(0)
+  val p1 = new Partition(1)
+
   @Before
   def setup {
     config = new MapConfig(Map(
@@ -68,29 +74,30 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
     TestCheckpointTool.systemAdmin = mock[SystemAdmin]
     when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo")))
       .thenReturn(Map("foo" -> metadata))
-    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new 
Partition(0)))
-      .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> 
"1234")))
-    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new 
Partition(1)))
-      .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> 
"4321")))
+    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0))
+      .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", 
p0) -> "1234")))
+    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
+      .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", 
p1) -> "4321")))
+
   }
 
   @Test
   def testReadLatestCheckpoint {
     new CheckpointTool(config, null).run
-    verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new 
Partition(0))
-    verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new 
Partition(1))
+    verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0)
+    verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1)
     verify(TestCheckpointTool.checkpointManager, 
never()).writeCheckpoint(any(), any())
   }
 
   @Test
   def testOverwriteCheckpoint {
-    new CheckpointTool(config, Map(
-      new SystemStreamPartition("test", "foo", new Partition(0)) -> "42",
-      new SystemStreamPartition("test", "foo", new Partition(1)) -> "43"
-    )).run
+    val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", 
p0) -> "42"),
+      tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
+
+    new CheckpointTool(config, toOverwrite).run
     verify(TestCheckpointTool.checkpointManager)
-      .writeCheckpoint(new Partition(0), new Checkpoint(Map(new 
SystemStream("test", "foo") -> "42")))
+      .writeCheckpoint(tn0, new Checkpoint(Map(new 
SystemStreamPartition("test", "foo", p0) -> "42")))
     verify(TestCheckpointTool.checkpointManager)
-      .writeCheckpoint(new Partition(1), new Checkpoint(Map(new 
SystemStream("test", "foo") -> "43")))
+      .writeCheckpoint(tn1, new Checkpoint(Map(new 
SystemStreamPartition("test", "foo", p1) -> "43")))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 94f6f4c..44a98a5 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -23,19 +23,21 @@ import scala.collection.JavaConversions._
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamMetadata.OffsetType
-import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.SystemStreamMetadata.{OffsetType, 
SystemStreamPartitionMetadata}
 import org.apache.samza.system.SystemStreamPartition
 import org.junit.Assert._
 import org.junit.{Ignore, Test}
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.apache.samza.system.SystemAdmin
+import java.util
+import org.apache.samza.container.TaskName
 import org.scalatest.Assertions.intercept
 
 class TestOffsetManager {
   @Test
   def testSystemShouldUseDefaults {
+    val taskName = new TaskName("c")
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
@@ -43,7 +45,7 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig(Map("systems.test-system.samza.offset.default" 
-> "oldest"))
     val offsetManager = OffsetManager(systemStreamMetadata, config)
-    offsetManager.register(systemStreamPartition)
+    offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     
assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
     
assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
@@ -52,21 +54,22 @@ class TestOffsetManager {
 
   @Test
   def testShouldLoadFromAndSaveWithCheckpointManager {
+    val taskName = new TaskName("c")
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, 
Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
-    val checkpointManager = getCheckpointManager(systemStreamPartition)
+    val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
     val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins)
-    offsetManager.register(systemStreamPartition)
+    offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(1, checkpointManager.registered.size)
-    assertEquals(partition, checkpointManager.registered.head)
-    assertEquals(checkpointManager.checkpoints.head._2, 
checkpointManager.readLastCheckpoint(partition))
+    assertEquals(taskName, checkpointManager.registered.head)
+    assertEquals(checkpointManager.checkpoints.head._2, 
checkpointManager.readLastCheckpoint(taskName))
     // Should get offset 45 back from the checkpoint manager, which is last 
processed, and system admin should return 46 as starting offset.
     assertEquals("46", 
offsetManager.getStartingOffset(systemStreamPartition).get)
     assertEquals("45", 
offsetManager.getLastProcessedOffset(systemStreamPartition).get)
@@ -76,31 +79,31 @@ class TestOffsetManager {
     assertEquals("47", 
offsetManager.getLastProcessedOffset(systemStreamPartition).get)
     // Should never update starting offset.
     assertEquals("46", 
offsetManager.getStartingOffset(systemStreamPartition).get)
-    offsetManager.checkpoint(partition)
-    val expectedCheckpoint = new Checkpoint(Map(systemStream -> "47"))
-    assertEquals(expectedCheckpoint, 
checkpointManager.readLastCheckpoint(partition))
+    offsetManager.checkpoint(taskName)
+    val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47"))
+    assertEquals(expectedCheckpoint, 
checkpointManager.readLastCheckpoint(taskName))
   }
 
   @Test
   def testShouldResetStreams {
+    val taskName = new TaskName("c")
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, 
Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val defaultOffsets = Map(systemStream -> OffsetType.OLDEST)
-    val checkpoint = new Checkpoint(Map(systemStream -> "45"))
-    val checkpointManager = getCheckpointManager(systemStreamPartition)
+    val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
+    val checkpointManager = getCheckpointManager(systemStreamPartition, 
taskName)
     val config = new MapConfig(Map(
       "systems.test-system.samza.offset.default" -> "oldest",
       "systems.test-system.streams.test-stream.samza.reset.offset" -> "true"))
     val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager)
-    offsetManager.register(systemStreamPartition)
+    offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(1, checkpointManager.registered.size)
-    assertEquals(partition, checkpointManager.registered.head)
-    assertEquals(checkpoint, checkpointManager.readLastCheckpoint(partition))
+    assertEquals(taskName, checkpointManager.registered.head)
+    assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName))
     // Should be zero even though the checkpoint has an offset of 45, since 
we're forcing a reset.
     assertEquals("0", 
offsetManager.getStartingOffset(systemStreamPartition).get)
   }
@@ -110,36 +113,38 @@ class TestOffsetManager {
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition1 = new Partition(0)
     val partition2 = new Partition(1)
+    val taskName1 = new TaskName("P0")
+    val taskName2 = new TaskName("P1")
     val systemStreamPartition1 = new SystemStreamPartition(systemStream, 
partition1)
     val systemStreamPartition2 = new SystemStreamPartition(systemStream, 
partition2)
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, 
Map(
       partition1 -> new SystemStreamPartitionMetadata("0", "1", "2"),
       partition2 -> new SystemStreamPartitionMetadata("3", "4", "5")))
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val defaultOffsets = Map(systemStream -> OffsetType.OLDEST)
-    val checkpoint = new Checkpoint(Map(systemStream -> "45"))
+    val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45"))
     // Checkpoint manager only has partition 1.
-    val checkpointManager = getCheckpointManager(systemStreamPartition1)
+    val checkpointManager = getCheckpointManager(systemStreamPartition1, 
taskName1)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
     val config = new MapConfig
     val offsetManager = OffsetManager(systemStreamMetadata, config, 
checkpointManager, systemAdmins)
     // Register both partitions. Partition 2 shouldn't have a checkpoint.
-    offsetManager.register(systemStreamPartition1)
-    offsetManager.register(systemStreamPartition2)
+    offsetManager.register(taskName1, Set(systemStreamPartition1))
+    offsetManager.register(taskName2, Set(systemStreamPartition2))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(2, checkpointManager.registered.size)
-    assertEquals(checkpoint, checkpointManager.readLastCheckpoint(partition1))
-    assertNull(checkpointManager.readLastCheckpoint(partition2))
+    assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName1))
+    assertNull(checkpointManager.readLastCheckpoint(taskName2))
   }
 
   @Test
   def testShouldFailWhenMissingMetadata {
+    val taskName = new TaskName("c")
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
     val offsetManager = new OffsetManager
-    offsetManager.register(systemStreamPartition)
+    offsetManager.register(taskName, Set(systemStreamPartition))
 
     intercept[SamzaException] {
       offsetManager.start
@@ -148,13 +153,14 @@ class TestOffsetManager {
 
   @Ignore("OffsetManager.start is supposed to throw an exception - but it 
doesn't") @Test
   def testShouldFailWhenMissingDefault {
+    val taskName = new TaskName("c")
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, 
partition)
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, 
Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val offsetManager = OffsetManager(systemStreamMetadata, new 
MapConfig(Map[String, String]()))
-    offsetManager.register(systemStreamPartition)
+    offsetManager.register(taskName, Set(systemStreamPartition))
 
     intercept[SamzaException] {
       offsetManager.start
@@ -190,6 +196,7 @@ class TestOffsetManager {
 
   @Test
   def testOutdatedStreamInCheckpoint {
+    val taskName = new TaskName("c")
     val systemStream0 = new SystemStream("test-system-0", "test-stream")
     val systemStream1 = new SystemStream("test-system-1", "test-stream")
     val partition0 = new Partition(0)
@@ -200,26 +207,32 @@ class TestOffsetManager {
     val offsetSettings = Map(systemStream0 -> 
OffsetSetting(testStreamMetadata, OffsetType.UPCOMING, false))
     val checkpointManager = getCheckpointManager(systemStreamPartition1)
     val offsetManager = new OffsetManager(offsetSettings, checkpointManager)
-    offsetManager.register(systemStreamPartition0)
+    offsetManager.register(taskName, Set(systemStreamPartition0))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(1, checkpointManager.registered.size)
     
assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null))
   }
 
-  private def getCheckpointManager(systemStreamPartition: 
SystemStreamPartition) = {
-    val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream 
-> "45"))
+  private def getCheckpointManager(systemStreamPartition: 
SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
+    val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
 
     new CheckpointManager {
       var isStarted = false
       var isStopped = false
-      var registered = Set[Partition]()
-      var checkpoints = Map(systemStreamPartition.getPartition -> checkpoint)
+      var registered = Set[TaskName]()
+      var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
+      var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = 
new util.HashMap[TaskName, java.lang.Integer]()
       def start { isStarted = true }
-      def register(partition: Partition) { registered += partition }
-      def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { 
checkpoints += partition -> checkpoint }
-      def readLastCheckpoint(partition: Partition) = 
checkpoints.getOrElse(partition, null)
+      def register(taskName: TaskName) { registered += taskName }
+      def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { 
checkpoints += taskName -> checkpoint }
+      def readLastCheckpoint(taskName: TaskName) = 
checkpoints.getOrElse(taskName, null)
       def stop { isStopped = true }
+
+      override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, 
java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
+
+      override def readChangeLogPartitionMapping(): util.Map[TaskName, 
java.lang.Integer] = taskNameToPartitionMapping
+
     }
   }
 
@@ -232,4 +245,4 @@ class TestOffsetManager {
         Map[String, SystemStreamMetadata]()
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
index 50d9a05..10ff1f4 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
@@ -23,34 +23,53 @@ import java.io.File
 import scala.collection.JavaConversions._
 import java.util.Random
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Before, Test}
 import org.apache.samza.SamzaException
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.container.TaskName
+import org.junit.rules.TemporaryFolder
 
-class TestFileSystemCheckpointManager {
-  val checkpointRoot = System.getProperty("java.io.tmpdir")
+class TestFileSystemCheckpointManager  {
+  val checkpointRoot = System.getProperty("java.io.tmpdir") // TODO: Move this 
out of tmp, into our build dir
+  val taskName = new TaskName("Warwickshire")
+  val baseFileLocation = new File(checkpointRoot)
+
+  val tempFolder = new TemporaryFolder
+
+  @Before
+  def createTempFolder = tempFolder.create()
+
+  @After
+  def deleteTempFolder = tempFolder.delete()
 
   @Test
   def testReadForCheckpointFileThatDoesNotExistShouldReturnNull {
-    val cpm = new FileSystemCheckpointManager("some-job-name", new 
File(checkpointRoot))
-    assert(cpm.readLastCheckpoint(new Partition(1)) == null)
+    val cpm = new FileSystemCheckpointManager("some-job-name", 
tempFolder.getRoot)
+    assertNull(cpm.readLastCheckpoint(taskName))
   }
 
   @Test
   def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint {
-    val cpm = new FileSystemCheckpointManager("some-job-name", new 
File(checkpointRoot))
-    val partition = new Partition(2)
     val cp = new Checkpoint(Map(
-      new SystemStream("a", "b") -> "c",
-      new SystemStream("a", "c") -> "d",
-      new SystemStream("b", "d") -> "e"))
+          new SystemStreamPartition("a", "b", new Partition(0)) -> "c",
+          new SystemStreamPartition("a", "c", new Partition(1)) -> "d",
+          new SystemStreamPartition("b", "d", new Partition(2)) -> "e"))
+
+    var readCp:Checkpoint = null
+    val cpm =  new FileSystemCheckpointManager("some-job-name", 
tempFolder.getRoot)
+
     cpm.start
-    cpm.writeCheckpoint(partition, cp)
-    val readCp = cpm.readLastCheckpoint(partition)
+    cpm.writeCheckpoint(taskName, cp)
+    readCp = cpm.readLastCheckpoint(taskName)
     cpm.stop
-    assert(readCp.equals(cp))
+
+    assertNotNull(readCp)
+    cp.equals(readCp)
+    assertEquals(cp.getOffsets.keySet(), readCp.getOffsets.keySet())
+    assertEquals(cp.getOffsets, readCp.getOffsets)
+    assertEquals(cp, readCp)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
new file mode 100644
index 0000000..3032b00
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container
+
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Test
+import java.util.HashSet
+import java.util.Map
+import java.util.Set
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import java.util.Collections
+
+object SystemStreamPartitionGrouperTestBase {
+  val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
+  val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1))
+  val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2))
+  val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1))
+  val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2))
+  val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0))
+  val allSSPs = new HashSet[SystemStreamPartition]
+  Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0)
+}
+
+abstract class SystemStreamPartitionGrouperTestBase {
+  def getGrouper: SystemStreamPartitionGrouper
+
+  @Test
+  def emptySetReturnsEmptyMap {
+    val grouper: SystemStreamPartitionGrouper = getGrouper
+    val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new 
HashSet[SystemStreamPartition])
+    assertTrue(result.isEmpty)
+  }
+
+  def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: 
Map[TaskName, Set[SystemStreamPartition]]) {
+    val grouper: SystemStreamPartitionGrouper = getGrouper
+    val result: Map[TaskName, Set[SystemStreamPartition]] = 
grouper.group(input)
+    assertEquals(output, result)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index fa10231..d4ceffc 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -37,29 +37,45 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ShouldMatche
 
   val p0 = new Partition(0)
   val p1 = new Partition(1)
+  val taskName0 = new TaskName(p0.toString)
+  val taskName1 = new TaskName(p1.toString)
   val ssp0 = new SystemStreamPartition("testSystem", "testStream", p0)
   val ssp1 = new SystemStreamPartition("testSystem", "testStream", p1)
   val envelope0 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0")
   val envelope1 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1")
 
+  def getMockTaskInstances: Map[TaskName, TaskInstance] = {
+    val ti0 = mock[TaskInstance]
+    when(ti0.systemStreamPartitions).thenReturn(Set(ssp0))
+    when(ti0.taskName).thenReturn(taskName0)
+
+    val ti1 = mock[TaskInstance]
+    when(ti1.systemStreamPartitions).thenReturn(Set(ssp1))
+    when(ti1.taskName).thenReturn(taskName1)
+
+    Map(taskName0 -> ti0, taskName1 -> ti1)
+  }
+
   @Test
   def testProcessMessageFromChooser {
-    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
     val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics)
 
     
when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new
 StopRunLoop)
     intercept[StopRunLoop] { runLoop.run }
-    verify(taskInstances(p0)).process(Matchers.eq(envelope0), anyObject)
-    verify(taskInstances(p1)).process(Matchers.eq(envelope1), anyObject)
+    verify(taskInstances(taskName0)).process(Matchers.eq(envelope0), anyObject)
+    verify(taskInstances(taskName1)).process(Matchers.eq(envelope1), anyObject)
     runLoop.metrics.envelopes.getCount should equal(2L)
     runLoop.metrics.nullEnvelopes.getCount should equal(0L)
   }
 
+
   @Test
   def testNullMessageFromChooser {
     val consumers = mock[SystemConsumers]
-    val runLoop = new RunLoop(Map(p0 -> mock[TaskInstance]), consumers, new 
SamzaContainerMetrics)
+    val map = getMockTaskInstances - taskName1 // This test only needs p0
+    val runLoop = new RunLoop(map, consumers, new SamzaContainerMetrics)
     when(consumers.choose).thenReturn(null).thenReturn(null).thenThrow(new 
StopRunLoop)
     intercept[StopRunLoop] { runLoop.run }
     runLoop.metrics.envelopes.getCount should equal(0L)
@@ -73,7 +89,7 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ShouldMatche
     when(consumers.choose).thenReturn(envelope0)
 
     val runLoop = new RunLoop(
-      taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]),
+      taskInstances = getMockTaskInstances,
       consumerMultiplexer = consumers,
       metrics = new SamzaContainerMetrics,
       windowMs = 60000, // call window once per minute
@@ -86,67 +102,67 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ShouldMatche
 
     intercept[StopRunLoop] { runLoop.run }
 
-    verify(runLoop.taskInstances(p0), times(5)).window(anyObject)
-    verify(runLoop.taskInstances(p1), times(5)).window(anyObject)
-    verify(runLoop.taskInstances(p0), times(10)).commit
-    verify(runLoop.taskInstances(p1), times(10)).commit
+    verify(runLoop.taskInstances(taskName0), times(5)).window(anyObject)
+    verify(runLoop.taskInstances(taskName1), times(5)).window(anyObject)
+    verify(runLoop.taskInstances(taskName0), times(10)).commit
+    verify(runLoop.taskInstances(taskName1), times(10)).commit
   }
 
   @Test
   def testCommitCurrentTaskManually {
-    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
     val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
 
     
when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new
 StopRunLoop)
-    stubProcess(taskInstances(p0), (envelope, coordinator) => 
coordinator.commit(RequestScope.CURRENT_TASK))
+    stubProcess(taskInstances(taskName0), (envelope, coordinator) => 
coordinator.commit(RequestScope.CURRENT_TASK))
 
     intercept[StopRunLoop] { runLoop.run }
-    verify(taskInstances(p0), times(1)).commit
-    verify(taskInstances(p1), times(0)).commit
+    verify(taskInstances(taskName0), times(1)).commit
+    verify(taskInstances(taskName1), times(0)).commit
   }
 
   @Test
   def testCommitAllTasksManually {
-    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
     val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
 
     when(consumers.choose).thenReturn(envelope0).thenThrow(new StopRunLoop)
-    stubProcess(taskInstances(p0), (envelope, coordinator) => 
coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER))
+    stubProcess(taskInstances(taskName0), (envelope, coordinator) => 
coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER))
 
     intercept[StopRunLoop] { runLoop.run }
-    verify(taskInstances(p0), times(1)).commit
-    verify(taskInstances(p1), times(1)).commit
+    verify(taskInstances(taskName0), times(1)).commit
+    verify(taskInstances(taskName1), times(1)).commit
   }
 
   @Test
   def testShutdownOnConsensus {
-    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
     val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
 
     
when(consumers.choose).thenReturn(envelope0).thenReturn(envelope0).thenReturn(envelope1)
-    stubProcess(taskInstances(p0), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.CURRENT_TASK))
-    stubProcess(taskInstances(p1), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.CURRENT_TASK))
+    stubProcess(taskInstances(taskName0), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.CURRENT_TASK))
+    stubProcess(taskInstances(taskName1), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.CURRENT_TASK))
 
     runLoop.run
-    verify(taskInstances(p0), times(2)).process(Matchers.eq(envelope0), 
anyObject)
-    verify(taskInstances(p1), times(1)).process(Matchers.eq(envelope1), 
anyObject)
+    verify(taskInstances(taskName0), times(2)).process(Matchers.eq(envelope0), 
anyObject)
+    verify(taskInstances(taskName1), times(1)).process(Matchers.eq(envelope1), 
anyObject)
   }
 
   @Test
   def testShutdownNow {
-    val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance])
+    val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
     val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
 
     when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1)
-    stubProcess(taskInstances(p0), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER))
+    stubProcess(taskInstances(taskName0), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER))
 
     runLoop.run
-    verify(taskInstances(p0), times(1)).process(anyObject, anyObject)
-    verify(taskInstances(p1), times(0)).process(anyObject, anyObject)
+    verify(taskInstances(taskName0), times(1)).process(anyObject, anyObject)
+    verify(taskInstances(taskName1), times(0)).process(anyObject, anyObject)
   }
 
   def anyObject[T] = Matchers.anyObject.asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 190bdfe..e3c7fe3 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,13 +19,11 @@
 
 package org.apache.samza.container
 
-import java.io.File
 import org.apache.samza.config.Config
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.Partition
 import org.apache.samza.config.MapConfig
-import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemConsumers
 import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.system.SystemConsumer
@@ -81,8 +79,7 @@ class TestSamzaContainer {
       }
     }
     val config = new MapConfig
-    val partition = new Partition(0)
-    val containerName = "test-container"
+    val taskName = new TaskName("taskName")
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -91,18 +88,18 @@ class TestSamzaContainer {
       new SerdeManager)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
-      partition,
+      taskName,
       config,
       new TaskInstanceMetrics,
       consumerMultiplexer: SystemConsumers,
       producerMultiplexer: SystemProducers)
     val runLoop = new RunLoop(
-      taskInstances = Map(partition -> taskInstance),
+      taskInstances = Map(taskName -> taskInstance),
       consumerMultiplexer = consumerMultiplexer,
       metrics = new SamzaContainerMetrics
     )
     val container = new SamzaContainer(
-      Map(partition -> taskInstance),
+      Map(taskName -> taskInstance),
       runLoop,
       consumerMultiplexer,
       producerMultiplexer,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 1f5e3bb..9d5ff13 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -50,7 +50,6 @@ class TestTaskInstance {
     }
     val config = new MapConfig
     val partition = new Partition(0)
-    val containerName = "test-container"
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -62,16 +61,17 @@ class TestTaskInstance {
     // Pretend our last checkpointed (next) offset was 2.
     val testSystemStreamMetadata = new 
SystemStreamMetadata(systemStream.getStream, Map(partition -> new 
SystemStreamPartitionMetadata("0", "1", "2")))
     val offsetManager = OffsetManager(Map(systemStream -> 
testSystemStreamMetadata), config)
+    val taskName = new TaskName("taskName")
     val taskInstance: TaskInstance = new TaskInstance(
       task,
-      partition,
+      taskName,
       config,
       new TaskInstanceMetrics,
       consumerMultiplexer,
       producerMultiplexer,
       offsetManager)
     // Pretend we got a message with offset 2 and next offset 3.
-    val coordinator = new ReadableCoordinator(partition)
+    val coordinator = new ReadableCoordinator(taskName)
     taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, 
"2", null, null), coordinator)
     // Check to see if the offset manager has been properly updated with 
offset 3.
     val lastProcessedOffset = 
offsetManager.getLastProcessedOffset(systemStreamPartition)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
new file mode 100644
index 0000000..d680b20
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.{SamzaException, Partition}
+
+class TestTaskNamesToSystemStreamPartitions {
+  var sspCounter = 0
+  def makeSSP(stream:String) = new SystemStreamPartition("system", stream, new 
Partition(42))
+
+  @Test
+  def toSetWorksCorrectly() {
+    val map = Map(new TaskName("tn1") -> Set(makeSSP("tn1-1"), 
makeSSP("tn1-2")),
+                  new TaskName("tn2") -> Set(makeSSP("tn2-1"), 
makeSSP("tn2-2")))
+    val tntssp = TaskNamesToSystemStreamPartitions(map)
+
+    val asSet = tntssp.toSet
+    val expected = Set(new TaskName("tn1") -> Set(makeSSP("tn1-1"), 
makeSSP("tn1-2")),
+                      (new TaskName("tn2") -> Set(makeSSP("tn2-1"), 
makeSSP("tn2-2"))))
+    assertEquals(expected , asSet)
+  }
+
+  @Test
+  def validateMethodCatchesDuplicatedSSPs() {
+    val duplicatedSSP1 = new SystemStreamPartition("sys", "str", new 
Partition(42))
+    val duplicatedSSP2 = new SystemStreamPartition("sys", "str", new 
Partition(42))
+    val notDuplicatedSSP1 = new SystemStreamPartition("sys", "str2", new 
Partition(42))
+    val notDuplicatedSSP2 = new SystemStreamPartition("sys", "str3", new 
Partition(42))
+
+    val badMapping = Map(new TaskName("a") -> Set(notDuplicatedSSP1, 
duplicatedSSP1), new TaskName("b") -> Set(notDuplicatedSSP2, duplicatedSSP2))
+
+    var caughtException = false
+    try {
+      TaskNamesToSystemStreamPartitions(badMapping)
+    } catch {
+      case se: SamzaException => assertEquals("Assigning the same 
SystemStreamPartition to multiple " +
+        "TaskNames is not currently supported.  Out of compliance 
SystemStreamPartitions and counts: " +
+        "Map(SystemStreamPartition [sys, str, 42] -> 2)", se.getMessage)
+        caughtException = true
+      case _: Throwable       =>
+    }
+    assertTrue("TaskNamesToSystemStreamPartitions should have rejected this 
mapping but didn't", caughtException)
+  }
+
+  @Test
+  def validateMethodAllowsUniqueSSPs() {
+    val sspSet1 = (0 to 10).map(p => new SystemStreamPartition("sys", "str", 
new Partition(p))).toSet
+    val sspSet2 = (0 to 10).map(p => new SystemStreamPartition("sys", "str2", 
new Partition(p))).toSet
+
+    TaskNamesToSystemStreamPartitions(Map(new TaskName("set1") -> sspSet1, new 
TaskName("set2") -> sspSet2))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
new file mode 100644
index 0000000..733be20
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.systemstreampartition.groupers
+
+import org.apache.samza.container.{TaskName, 
SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper}
+import scala.collection.JavaConverters._
+import org.junit.Test
+
+class TestGroupByPartition extends SystemStreamPartitionGrouperTestBase {
+  import SystemStreamPartitionGrouperTestBase._
+
+  val expected /* from base class provided set */ =  Map(new 
TaskName("Partition 0") -> Set(aa0, ac0).asJava,
+                                                         new 
TaskName("Partition 1") -> Set(aa1, ab1).asJava,
+                                                         new 
TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
+
+  override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
+
+  @Test def groupingWorks() {
+    verifyGroupGroupsCorrectly(allSSPs, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
new file mode 100644
index 0000000..e9c15a5
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.systemstreampartition.groupers
+
+import org.apache.samza.container.{TaskName, 
SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper}
+import scala.collection.JavaConverters._
+import org.junit.Test
+
+class TestGroupBySystemStreamPartition extends 
SystemStreamPartitionGrouperTestBase {
+  import SystemStreamPartitionGrouperTestBase._
+
+  // Building manually to avoid just duplicating a logic potential logic error 
here and there
+  val expected /* from base class provided set */ =  Map(new 
TaskName(aa0.toString) -> Set(aa0).asJava,
+    new TaskName(aa1.toString) -> Set(aa1).asJava,
+    new TaskName(aa2.toString) -> Set(aa2).asJava,
+    new TaskName(ab1.toString) -> Set(ab1).asJava,
+    new TaskName(ab2.toString) -> Set(ab2).asJava,
+    new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
+
+  override def getGrouper: SystemStreamPartitionGrouper = new 
GroupBySystemStreamPartition
+
+  @Test def groupingWorks() {
+    verifyGroupGroupsCorrectly(allSSPs, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
 
b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
new file mode 100644
index 0000000..7ea09cd
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.systemstreampartition.taskname.groupers
+
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Assert._
+import org.junit.Test
+
+class TestSimpleSystemStreamPartitionTaskNameGrouper {
+  val emptySSPSet = Set[SystemStreamPartition]()
+
+  @Test
+  def weGetAsExactlyManyGroupsAsWeAskFor() {
+    // memoize the maps used in the test to avoid an O(n^3) loop
+    val tntsspCache = scala.collection.mutable.Map[Int, 
TaskNamesToSystemStreamPartitions]()
+
+    def tntsspOfSize(size:Int) = {
+      def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until 
size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap)
+
+      tntsspCache.getOrElseUpdate(size, getMap(size))
+    }
+
+    val maxTNTSSPSize = 1000
+    val maxNumGroups = 140
+    for(numGroups <- 1 to maxNumGroups) {
+      val grouper = new SimpleSystemStreamPartitionTaskNameGrouper(numGroups)
+
+      for (tntsspSize <- numGroups to maxTNTSSPSize) {
+        val map = tntsspOfSize(tntsspSize)
+        assertEquals(tntsspSize, map.size)
+
+        val grouped = grouper.groupTaskNames(map)
+        assertEquals("Asked for " + numGroups + " but got " + grouped.size, 
numGroups, grouped.size)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 
b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index 21d8a78..258ccc1 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.job
 import java.io.File
 import org.apache.samza.config.Config
-import org.junit.Assert._
 import org.junit.Test
 
 object TestJobRunner {

Reply via email to