http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala new file mode 100644 index 0000000..223862f --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.systemstreampartition.groupers + +import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper} +import java.util +import org.apache.samza.system.SystemStreamPartition +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ +import org.apache.samza.config.Config + +/** + * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being + * the string representation of the Partition. + */ +class GroupByPartition extends SystemStreamPartitionGrouper { + override def group(ssps: util.Set[SystemStreamPartition]) = { + ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) ) + .map(r => r._1 -> r._2.asJava) + } +} + +class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory { + override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition +}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala new file mode 100644 index 0000000..a2bcfee --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.systemstreampartition.groupers + +import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper} +import java.util +import org.apache.samza.system.SystemStreamPartition +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ +import org.apache.samza.config.Config + +/** + * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each + * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition + */ +class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper { + override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava) +} + +class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory { + override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala new file mode 100644 index 0000000..7913294 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.systemstreampartition.taskname.groupers + +import org.apache.samza.container.{TaskName, SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions} +import org.apache.samza.system.SystemStreamPartition + +/** + * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames + * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be). + * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution + * of the number of taskNames between containers, etc. + */ +class SimpleSystemStreamPartitionTaskNameGrouper(numContainers:Int) extends SystemStreamPartitionTaskNameGrouper { + require(numContainers > 0, "Must have at least one container") + + override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = { + val keySize = taskNames.keySet.size + require(keySize > 0, "Must have some SSPs to group, but found none") + + // Iterate through the taskNames, round-robining them per container + val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap + var idx = 0 + for(taskName <- taskNames.iterator) { + val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above + idx = (idx + 1) % numContainers + + currMap += taskName + } + + byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap + } +} + http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala index 4635bb2..8c5533c 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala @@ -19,17 +19,76 @@ package org.apache.samza.job -import scala.collection.JavaConversions._ +import java.util +import org.apache.samza.Partition import org.apache.samza.config.ShellCommandConfig import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand import org.apache.samza.config.serializers.JsonConfigSerializer +import org.apache.samza.container.TaskName +import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.Util +import org.codehaus.jackson.`type`.TypeReference +import org.codehaus.jackson.map.ObjectMapper +import scala.collection.JavaConversions._ +import scala.reflect.BeanProperty + +object ShellCommandBuilder { + /** + * Jackson really hates Scala's classes, so we need to wrap up the SSP in a form Jackson will take + */ + private class SSPWrapper(@BeanProperty var partition:java.lang.Integer = null, + @BeanProperty var Stream:java.lang.String = null, + @BeanProperty var System:java.lang.String = null) { + def this() { this(null, null, null) } + def this(ssp:SystemStreamPartition) { this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, ssp.getSystemStream.getSystem)} + } + + def serializeSystemStreamPartitionSetToJSON(sspTaskNames: java.util.Map[TaskName,java.util.Set[SystemStreamPartition]]): String = { + val map = new util.HashMap[TaskName, util.ArrayList[SSPWrapper]]() + for((key, ssps) <- sspTaskNames) { + val al = new util.ArrayList[SSPWrapper](ssps.size) + for(ssp <- ssps) { al.add(new SSPWrapper(ssp)) } + map.put(key, al) + } + + new ObjectMapper().writeValueAsString(map) + } + + def deserializeSystemStreamPartitionSetFromJSON(sspsAsJSON: String): Map[TaskName, Set[SystemStreamPartition]] = { + val om = new ObjectMapper() + + val asWrapper = om.readValue(sspsAsJSON, new TypeReference[util.HashMap[String, util.ArrayList[SSPWrapper]]]() { }).asInstanceOf[util.HashMap[String, util.ArrayList[SSPWrapper]]] + + val taskName = for((key, sspsWrappers) <- asWrapper; + taskName = new TaskName(key); + ssps = sspsWrappers.map(w => new SystemStreamPartition(w.getSystem, w.getStream, new Partition(w.getPartition))).toSet + ) yield(taskName -> ssps) + taskName.toMap // to get an immutable map rather than mutable... + } + + def serializeTaskNameToChangeLogPartitionMapping(mapping:Map[TaskName, Int]) = { + val javaMap = new util.HashMap[TaskName, java.lang.Integer]() + + mapping.foreach(kv => javaMap.put(kv._1, Integer.valueOf(kv._2))) + + new ObjectMapper().writeValueAsString(javaMap) + } + + def deserializeTaskNameToChangeLogPartitionMapping(taskNamesAsJSON: String): Map[TaskName, Int] = { + val om = new ObjectMapper() + + val asMap = om.readValue(taskNamesAsJSON, new TypeReference[util.HashMap[String, java.lang.Integer]] {}).asInstanceOf[util.HashMap[String, java.lang.Integer]] + + asMap.map(kv => new TaskName(kv._1) -> kv._2.intValue()).toMap + } +} class ShellCommandBuilder extends CommandBuilder { def buildCommand() = config.getCommand def buildEnvironment(): java.util.Map[String, String] = { - var streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion + var streamsAndPartsString = ShellCommandBuilder.serializeSystemStreamPartitionSetToJSON(taskNameToSystemStreamPartitionsMapping) // Java to Scala set conversion + var taskNameToChangeLogPartitionMappingString = ShellCommandBuilder.serializeTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMapping.map(kv => kv._1 -> kv._2.toInt).toMap) var envConfig = JsonConfigSerializer.toJson(config) val isCompressed = if(config.isEnvConfigCompressed) "TRUE" else "FALSE" @@ -40,14 +99,17 @@ class ShellCommandBuilder extends CommandBuilder { * of the environment variable(s) exported while starting a Samza container (SAMZA-337) */ streamsAndPartsString = Util.compress(streamsAndPartsString) + taskNameToChangeLogPartitionMappingString = Util.compress(taskNameToChangeLogPartitionMappingString) envConfig = Util.compress(envConfig) } Map( ShellCommandConfig.ENV_CONTAINER_NAME -> name, ShellCommandConfig.ENV_SYSTEM_STREAMS -> streamsAndPartsString, + ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING -> taskNameToChangeLogPartitionMappingString, ShellCommandConfig.ENV_CONFIG -> envConfig, ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""), ShellCommandConfig.ENV_COMPRESS_CONFIG -> isCompressed) + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala index e20e7c1..713bded 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala @@ -24,24 +24,32 @@ import org.apache.samza.config.ShellCommandConfig._ import org.apache.samza.job.CommandBuilder import org.apache.samza.job.StreamJob import org.apache.samza.job.StreamJobFactory -import scala.collection.JavaConversions._ import grizzled.slf4j.Logging import org.apache.samza.SamzaException -import org.apache.samza.container.SamzaContainer +import org.apache.samza.container.{TaskNamesToSystemStreamPartitions, SamzaContainer} import org.apache.samza.util.Util import org.apache.samza.job.ShellCommandBuilder +import scala.collection.JavaConversions._ class LocalJobFactory extends StreamJobFactory with Logging { def getJob(config: Config): StreamJob = { - val taskName = "local-task" - val partitions = Util.getInputStreamPartitions(config) + val jobName = "local-container" - info("got partitions for job %s" format partitions) + // Since we're local, there will only be a single task into which all the SSPs will be processed + val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1) + if(taskToTaskNames.size != 1) { + throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size) + } - if (partitions.size <= 0) { - throw new SamzaException("No partitions were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams) + // So pull out that single TaskNamesToSystemStreamPartitions + val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames)) + if (sspTaskName.size <= 0) { + throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams) } + val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames).map(kv => kv._1 -> Integer.valueOf(kv._2)) + info("got taskName for job %s" format sspTaskName) + config.getCommandClass match { case Some(cmdBuilderClassName) => { // A command class was specified, so we need to use a process job to @@ -50,8 +58,8 @@ class LocalJobFactory extends StreamJobFactory with Logging { cmdBuilder .setConfig(config) - .setName(taskName) - .setStreamPartitions(partitions) + .setName(jobName) + .setTaskNameToSystemStreamPartitionsMapping(sspTaskName.getJavaFriendlyType) val processBuilder = new ProcessBuilder(cmdBuilder.buildCommand.split(" ").toList) @@ -72,7 +80,7 @@ class LocalJobFactory extends StreamJobFactory with Logging { // No command class was specified, so execute the job in this process // using a threaded job. - new ThreadJob(SamzaContainer(taskName, partitions, config)) + new ThreadJob(SamzaContainer(jobName, sspTaskName, taskNameToChangeLogPartitionMapping, config)) } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala index 3d0a484..34c846c 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala @@ -19,26 +19,59 @@ package org.apache.samza.serializers -import scala.collection.JavaConversions._ -import org.codehaus.jackson.map.ObjectMapper -import org.apache.samza.system.SystemStream -import org.apache.samza.checkpoint.Checkpoint -import org.apache.samza.SamzaException import grizzled.slf4j.Logging +import java.util +import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.container.TaskName +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.{SamzaException, Partition} +import org.codehaus.jackson.map.ObjectMapper +import scala.collection.JavaConversions._ +import org.codehaus.jackson.`type`.TypeReference +/** + * Write out the Checkpoint object in JSON. The underlying map of SSP => Offset cannot be stored directly because + * JSON only allows strings as map types, so we would need to separately serialize the SSP to a string that doesn't + * then interfere with JSON's decoding of the overall map. We'll sidestep the whole issue by turning the + * map into a list[String] of (System, Stream, Partition, Offset) serializing that. + */ class CheckpointSerde extends Serde[Checkpoint] with Logging { + import CheckpointSerde._ + // TODO: Elucidate the CheckpointSerde relationshiop to Serde. Should Serde also have keyTo/FromBytes? Should + // we just take CheckpointSerde here as interface and have this be JSONCheckpointSerde? + // TODO: Add more tests. This class currently only has direct test and is mainly tested by the other checkpoint managers val jsonMapper = new ObjectMapper() + // Jackson absolutely hates Scala types and hidden conversions hate you, so we're going to be very, very + // explicit about the Java (not Scala) types used here and never let Scala get its grubby little hands + // on any instance. + + // Store checkpoint as maps keyed of the SSP.toString to the another map of the constituent SSP components + // and offset. Jackson can't automatically serialize the SSP since it's not a POJO and this avoids + // having to wrap it another class while maintaing readability. + def fromBytes(bytes: Array[Byte]): Checkpoint = { try { - val checkpointMap = jsonMapper - .readValue(bytes, classOf[java.util.Map[String, java.util.Map[String, String]]]) - .flatMap { - case (systemName, streamToOffsetMap) => - streamToOffsetMap.map { case (streamName, offset) => (new SystemStream(systemName, streamName), offset) } - } - return new Checkpoint(checkpointMap) - } catch { + val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, util.HashMap[String, String]]]) + + def deserializeJSONMap(m:util.HashMap[String, String]) = { + require(m.size() == 4, "All JSON-encoded SystemStreamPartitions must have four keys") + val system = m.get("system") + require(system != null, "System must be present in JSON-encoded SystemStreamPartition") + val stream = m.get("stream") + require(stream != null, "Stream must be present in JSON-encoded SystemStreamPartition") + val partition = m.get("partition") + require(partition != null, "Partition must be present in JSON-encoded SystemStreamPartition") + val offset = m.get("offset") + require(offset != null, "Offset must be present in JSON-encoded SystemStreamPartition") + + new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset + } + + val cpMap = jMap.values.map(deserializeJSONMap).toMap + + return new Checkpoint(cpMap) + }catch { case e : Exception => warn("Exception while deserializing checkpoint: " + e) debug("Exception detail:", e) @@ -46,34 +79,38 @@ class CheckpointSerde extends Serde[Checkpoint] with Logging { } } - def toBytes(checkpoint: Checkpoint) = { - val offsetMap = mapAsJavaMap(checkpoint - .getOffsets - // Convert Map[SystemStream, String] offset map to a iterable of tuples (system, stream, offset) - .map { case (systemStream, offset) => (systemStream.getSystem, systemStream.getStream, offset) } - // Group into a Map[String, (String, String, String)] by system - .groupBy(_._1) - // Group the tuples for each system into a Map[String, String] for stream to offsets - .map { - case (systemName, tuples) => - val streamToOffestMap = mapAsJavaMap(tuples - // Group the tuples by stream name - .groupBy(_._2) - // There should only ever be one SystemStream to offset mapping, so just - // grab the first element from the tuple list for each stream. - .map { - case (streamName, tuples) => { - // If there's more than one offset, something is seriously wrong. - if (tuples.size != 1) { - throw new SamzaException("Got %s offsets for %s. Expected only one offset, so failing." format (tuples.size, streamName)) - } - (streamName, tuples.head._3) - } - } - .toMap) - (systemName, streamToOffestMap) - }.toMap) - - jsonMapper.writeValueAsBytes(offsetMap) + def toBytes(checkpoint: Checkpoint): Array[Byte] = { + val offsets = checkpoint.getOffsets + val asMap = new util.HashMap[String, util.HashMap[String, String]](offsets.size()) + + offsets.foreach { + case (ssp, offset) => + val jMap = new util.HashMap[String, String](4) + jMap.put("system", ssp.getSystemStream.getSystem) + jMap.put("stream", ssp.getSystemStream.getStream) + jMap.put("partition", ssp.getPartition.getPartitionId.toString) + jMap.put("offset", offset) + + asMap.put(ssp.toString, jMap) + } + + jsonMapper.writeValueAsBytes(asMap) + } + + def changelogPartitionMappingFromBytes(bytes: Array[Byte]): util.Map[TaskName, java.lang.Integer] = { + try { + jsonMapper.readValue(bytes, PARTITION_MAPPING_TYPEREFERENCE) + } catch { + case e : Exception => + throw new SamzaException("Exception while deserializing changelog partition mapping", e) + } } + + def changelogPartitionMappingToBytes(mapping: util.Map[TaskName, java.lang.Integer]) = { + jsonMapper.writeValueAsBytes(new util.HashMap[TaskName, java.lang.Integer](mapping)) + } +} + +object CheckpointSerde { + val PARTITION_MAPPING_TYPEREFERENCE = new TypeReference[util.HashMap[TaskName, java.lang.Integer]]() {} } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 7214151..0cfdbb3 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -29,14 +29,16 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.SystemStreamPartitionIterator import org.apache.samza.util.Util import org.apache.samza.SamzaException +import org.apache.samza.container.TaskName object TaskStorageManager { def getStoreDir(storeBaseDir: File, storeName: String) = { new File(storeBaseDir, storeName) } - def getStorePartitionDir(storeBaseDir: File, storeName: String, partition: Partition) = { - new File(storeBaseDir, storeName + File.separator + partition.getPartitionId) + def getStorePartitionDir(storeBaseDir: File, storeName: String, taskName: TaskName) = { + // TODO: Sanitize, check and clean taskName string as a valid value for a file + new File(storeBaseDir, storeName + File.separator + taskName) } } @@ -44,12 +46,13 @@ object TaskStorageManager { * Manage all the storage engines for a given task */ class TaskStorageManager( - partition: Partition, + taskName: TaskName, taskStores: Map[String, StorageEngine] = Map(), storeConsumers: Map[String, SystemConsumer] = Map(), changeLogSystemStreams: Map[String, SystemStream] = Map(), changeLogOldestOffsets: Map[SystemStream, String] = Map(), - storeBaseDir: File = new File(System.getProperty("user.dir"), "state")) extends Logging { + storeBaseDir: File = new File(System.getProperty("user.dir"), "state"), + partition: Partition) extends Logging { var taskStoresToRestore = taskStores @@ -66,7 +69,7 @@ class TaskStorageManager( debug("Cleaning base directories for stores.") taskStores.keys.foreach(storeName => { - val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, partition) + val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) debug("Cleaning %s for store %s." format (storagePartitionDir, storeName)) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala index 4ccd604..6e1134d 100644 --- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala @@ -20,12 +20,12 @@ package org.apache.samza.task import org.apache.samza.task.TaskCoordinator.RequestScope -import org.apache.samza.Partition +import org.apache.samza.container.TaskName /** * An in-memory implementation of TaskCoordinator that is specific to a single TaskInstance. */ -class ReadableCoordinator(val partition: Partition) extends TaskCoordinator { +class ReadableCoordinator(val taskName: TaskName) extends TaskCoordinator { var commitRequest: Option[RequestScope] = None var shutdownRequest: Option[RequestScope] = None http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 60d96c9..32c2647 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -19,23 +19,26 @@ package org.apache.samza.util +import grizzled.slf4j.Logging import java.io._ import java.lang.management.ManagementFactory +import java.util import java.util.Random import java.util.zip.{GZIPInputStream, GZIPOutputStream} -import grizzled.slf4j.Logging import org.apache.commons.codec.binary.Base64 -import org.apache.samza.{ Partition, SamzaException } +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.CheckpointManagerFactory import org.apache.samza.config.Config +import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task -import scala.collection.JavaConversions._ +import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory +import org.apache.samza.container.systemstreampartition.taskname.groupers.SimpleSystemStreamPartitionTaskNameGrouper +import org.apache.samza.container.{TaskName, SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions, SystemStreamPartitionGrouperFactory} +import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.{SystemStreamPartition, SystemFactory, StreamMetadataCache, SystemStream} -import org.codehaus.jackson.map.ObjectMapper -import org.codehaus.jackson.`type`.TypeReference -import java.util -import scala.reflect.BeanProperty - +import scala.collection.JavaConversions._ +import scala.collection object Util extends Logging { val random = new Random @@ -82,7 +85,7 @@ object Util extends Logging { /** * For each input stream specified in config, exactly determine its partitions, returning a set of SystemStreamPartitions - * corresponding to them all + * containing them all * * @param config Source of truth for systems and inputStreams * @return Set of SystemStreamPartitions, one for each unique system, stream and partition @@ -112,6 +115,50 @@ object Util extends Logging { } /** + * Assign mapping of which TaskNames go to which container + * + * @param config For factories for Grouper and TaskNameGrouper + * @param containerCount How many tasks are we we working with + * @return Map of int (taskId) to SSPTaskNameMap that taskID is responsible for + */ + def assignContainerToSSPTaskNames(config:Config, containerCount:Int): Map[Int, TaskNamesToSystemStreamPartitions] = { + import org.apache.samza.config.JobConfig.Config2Job + + val allSystemStreamPartitions: Set[SystemStreamPartition] = Util.getInputStreamPartitions(config) + + val sspTaskNamesAsJava: util.Map[TaskName, util.Set[SystemStreamPartition]] = { + val factoryString = config.getSystemStreamPartitionGrouperFactory + + info("Instantiating type " + factoryString + " to build SystemStreamPartition groupings") + + val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString) + + val grouper = factory.getSystemStreamPartitionGrouper(config) + + val groups = grouper.group(allSystemStreamPartitions) + + info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:") + groups.foreach(g => info("TaskName: " + g._1 + " => " + g._2)) + + groups + } + + val sspTaskNames = TaskNamesToSystemStreamPartitions(sspTaskNamesAsJava) + + info("Assigning " + sspTaskNames.keySet.size + " SystemStreamPartitions taskNames to " + containerCount + " containers.") + + // Here is where we should put in a pluggable option for the SSPTaskNameGrouper for locality, load-balancing, etc. + val sspTaskNameGrouper = new SimpleSystemStreamPartitionTaskNameGrouper(containerCount) + + val containersToTaskNames = sspTaskNameGrouper.groupTaskNames(sspTaskNames).toMap + + info("Grouped SystemStreamPartition TaskNames (size = " + containersToTaskNames.size + "): ") + containersToTaskNames.foreach(t => info("Container number: " + t._1 + " => " + t._2)) + + containersToTaskNames + } + + /** * Returns a SystemStream object based on the system stream name given. For * example, kafka.topic would return new SystemStream("kafka", "topic"). */ @@ -131,40 +178,120 @@ object Util extends Logging { } /** - * For specified containerId, create a list of of the streams and partitions that task should handle, - * based on the number of tasks in the job + * Using previous taskName to partition mapping and current taskNames for this job run, create a new mapping that preserves + * the previous order and deterministically assigns any new taskNames to changelog partitions. Be chatty about new or + * missing taskNames. * - * @param containerId TaskID to determine work for - * @param containerCount Total number of tasks in the job - * @param ssp All SystemStreamPartitions - * @return Collection of streams and partitions for this particular containerId + * @param currentTaskNames All the taskNames the current job is processing + * @param previousTaskNameMapping Previous mapping of taskNames to partition + * @return New mapping of taskNames to partitions for the changelog */ - def getStreamsAndPartitionsForContainer(containerId: Int, containerCount: Int, ssp: Set[SystemStreamPartition]): Set[SystemStreamPartition] = { - ssp.filter(_.getPartition.getPartitionId % containerCount == containerId) + def resolveTaskNameToChangelogPartitionMapping(currentTaskNames:Set[TaskName], + previousTaskNameMapping:Map[TaskName, Int]): Map[TaskName, Int] = { + info("Previous mapping of taskNames to partition: " + previousTaskNameMapping.toList.sorted) + info("Current set of taskNames: " + currentTaskNames.toList.sorted) + + val previousTaskNames: Set[TaskName] = previousTaskNameMapping.keySet + + if(previousTaskNames.equals(currentTaskNames)) { + info("No change in TaskName sets from previous run. Returning previous mapping.") + return previousTaskNameMapping + } + + if(previousTaskNames.isEmpty) { + warn("No previous taskName mapping defined. This is OK if it's the first time the job is being run, otherwise data may have been lost.") + } + + val missingTaskNames = previousTaskNames -- currentTaskNames + + if(missingTaskNames.isEmpty) { + info("No taskNames are missing between this run and previous") + } else { + warn("The following taskNames were previously defined and are no longer present: " + missingTaskNames) + } + + val newTaskNames = currentTaskNames -- previousTaskNames + + if(newTaskNames.isEmpty) { + info("No new taskNames have been added between this run and the previous") + previousTaskNameMapping // Return the old mapping since there are no new taskNames for which to account + + } else { + warn("The following new taskNames have been added in this job run: " + newTaskNames) + + // Sort the new taskNames and assign them partitions (starting at 0 for now) + val sortedNewTaskNames = newTaskNames.toList.sortWith { (a,b) => a.getTaskName < b.getTaskName }.zipWithIndex.toMap + + // Find next largest partition to use based on previous mapping + val nextPartitionToUse = if(previousTaskNameMapping.size == 0) 0 + else previousTaskNameMapping.foldLeft(0)((a,b) => math.max(a, b._2)) + 1 + + // Bump up the partition values + val newTaskNamesWithTheirPartitions = sortedNewTaskNames.map(c => c._1 -> (c._2 + nextPartitionToUse)) + + // Merge old and new + val newMapping = previousTaskNameMapping ++ newTaskNamesWithTheirPartitions + + info("New taskName to partition mapping: " + newMapping.toList.sortWith{ (a,b) => a._2 < b._2}) + + newMapping + } } /** - * Jackson really hates Scala's classes, so we need to wrap up the SSP in a form Jackson will take + * Read the TaskName to changelog partition mapping from the checkpoint manager, if one exists. + * + * @param config To pull out values for instantiating checkpoint manager + * @param tasksToSSPTaskNames Current TaskNames for the current job run + * @return Current mapping of TaskNames to changelog partitions */ - private class SSPWrapper(@BeanProperty var partition:java.lang.Integer = null, - @BeanProperty var Stream:java.lang.String = null, - @BeanProperty var System:java.lang.String = null) { - def this() { this(null, null, null) } - def this(ssp:SystemStreamPartition) { this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, ssp.getSystemStream.getSystem)} - } + def getTaskNameToChangeLogPartitionMapping(config: Config, tasksToSSPTaskNames: Map[Int, TaskNamesToSystemStreamPartitions]) = { + val taskNameMaps: Set[TaskNamesToSystemStreamPartitions] = tasksToSSPTaskNames.map(_._2).toSet + val currentTaskNames: Set[TaskName] = taskNameMaps.map(_.keys).toSet.flatten + + // We need to oh so quickly instantiate a checkpoint manager and grab the partition mapping from the log, then toss the manager aside + val checkpointManager = config.getCheckpointManagerFactory match { + case Some(checkpointFactoryClassName) => + Util + .getObj[CheckpointManagerFactory](checkpointFactoryClassName) + .getCheckpointManager(config, new MetricsRegistryMap) + case _ => null + } - def serializeSSPSetToJSON(ssps: Set[SystemStreamPartition]): String = { - val al = new util.ArrayList[SSPWrapper](ssps.size) - for(ssp <- ssps) { al.add(new SSPWrapper(ssp)) } + if(checkpointManager == null) { + // Check if we have a changelog configured, which requires a checkpoint manager - new ObjectMapper().writeValueAsString(al) - } + if(!config.getStoreNames.isEmpty) { + throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified. " + + "Unable to start job as there would be no place to store changelog partition mapping.") + } + // No need to do any mapping, just use what has been provided + Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, Map[TaskName, Int]()) + } else { + + info("Got checkpoint manager: %s" format checkpointManager) + + // Always put in a call to create so the log is available for the tasks on startup. + // Reasonably lame to hide it in here. TODO: Pull out to more visible location. + checkpointManager.start - def deserializeSSPSetFromJSON(ssp: String) = { - val om = new ObjectMapper() + val previousMapping: Map[TaskName, Int] = { + val fromCM = checkpointManager.readChangeLogPartitionMapping() - val asWrapper = om.readValue(ssp, new TypeReference[util.ArrayList[SSPWrapper]]() { }).asInstanceOf[util.ArrayList[SSPWrapper]] - asWrapper.map(w => new SystemStreamPartition(w.getSystem, w.getStream(), new Partition(w.getPartition()))).toSet + fromCM.map(kv => kv._1 -> kv._2.intValue()).toMap // Java to Scala interop!!! + } + + checkpointManager.stop + + val newMapping = Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, previousMapping) + + if (newMapping != null) { + info("Writing new changelog partition mapping to checkpoint manager.") + checkpointManager.writeChangeLogPartitionMapping(newMapping.map(kv => kv._1 -> new java.lang.Integer(kv._2))) //Java to Scala interop!!! + } + + newMapping + } } /** http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala index bc54f9e..1eb3995 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala @@ -19,18 +19,19 @@ package org.apache.samza.checkpoint +import org.apache.samza.Partition +import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory} +import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig} +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition} import org.junit.{Before, Test} import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mock.MockitoSugar import scala.collection.JavaConversions._ -import org.apache.samza.Partition -import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory} -import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig} -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer, SystemStream, SystemStreamMetadata, SystemStreamPartition} -import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.container.TaskName object TestCheckpointTool { var checkpointManager: CheckpointManager = null @@ -52,6 +53,11 @@ object TestCheckpointTool { class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar { var config: MapConfig = null + val tn0 = new TaskName("Partition 0") + val tn1 = new TaskName("Partition 1") + val p0 = new Partition(0) + val p1 = new Partition(1) + @Before def setup { config = new MapConfig(Map( @@ -68,29 +74,30 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar { TestCheckpointTool.systemAdmin = mock[SystemAdmin] when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo"))) .thenReturn(Map("foo" -> metadata)) - when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new Partition(0))) - .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> "1234"))) - when(TestCheckpointTool.checkpointManager.readLastCheckpoint(new Partition(1))) - .thenReturn(new Checkpoint(Map(new SystemStream("test", "foo") -> "4321"))) + when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0)) + .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234"))) + when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1)) + .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "4321"))) + } @Test def testReadLatestCheckpoint { new CheckpointTool(config, null).run - verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new Partition(0)) - verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(new Partition(1)) + verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0) + verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1) verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any()) } @Test def testOverwriteCheckpoint { - new CheckpointTool(config, Map( - new SystemStreamPartition("test", "foo", new Partition(0)) -> "42", - new SystemStreamPartition("test", "foo", new Partition(1)) -> "43" - )).run + val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"), + tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43")) + + new CheckpointTool(config, toOverwrite).run verify(TestCheckpointTool.checkpointManager) - .writeCheckpoint(new Partition(0), new Checkpoint(Map(new SystemStream("test", "foo") -> "42"))) + .writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42"))) verify(TestCheckpointTool.checkpointManager) - .writeCheckpoint(new Partition(1), new Checkpoint(Map(new SystemStream("test", "foo") -> "43"))) + .writeCheckpoint(tn1, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "43"))) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index 94f6f4c..44a98a5 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -23,19 +23,21 @@ import scala.collection.JavaConversions._ import org.apache.samza.Partition import org.apache.samza.system.SystemStream import org.apache.samza.system.SystemStreamMetadata -import org.apache.samza.system.SystemStreamMetadata.OffsetType -import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata} import org.apache.samza.system.SystemStreamPartition import org.junit.Assert._ import org.junit.{Ignore, Test} import org.apache.samza.SamzaException import org.apache.samza.config.MapConfig import org.apache.samza.system.SystemAdmin +import java.util +import org.apache.samza.container.TaskName import org.scalatest.Assertions.intercept class TestOffsetManager { @Test def testSystemShouldUseDefaults { + val taskName = new TaskName("c") val systemStream = new SystemStream("test-system", "test-stream") val partition = new Partition(0) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) @@ -43,7 +45,7 @@ class TestOffsetManager { val systemStreamMetadata = Map(systemStream -> testStreamMetadata) val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest")) val offsetManager = OffsetManager(systemStreamMetadata, config) - offsetManager.register(systemStreamPartition) + offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined) assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined) @@ -52,21 +54,22 @@ class TestOffsetManager { @Test def testShouldLoadFromAndSaveWithCheckpointManager { + val taskName = new TaskName("c") val systemStream = new SystemStream("test-system", "test-stream") val partition = new Partition(0) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) val systemStreamMetadata = Map(systemStream -> testStreamMetadata) val config = new MapConfig - val checkpointManager = getCheckpointManager(systemStreamPartition) + val checkpointManager = getCheckpointManager(systemStreamPartition, taskName) val systemAdmins = Map("test-system" -> getSystemAdmin) val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins) - offsetManager.register(systemStreamPartition) + offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start assertTrue(checkpointManager.isStarted) assertEquals(1, checkpointManager.registered.size) - assertEquals(partition, checkpointManager.registered.head) - assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(partition)) + assertEquals(taskName, checkpointManager.registered.head) + assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(taskName)) // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset. assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get) assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get) @@ -76,31 +79,31 @@ class TestOffsetManager { assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get) // Should never update starting offset. assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get) - offsetManager.checkpoint(partition) - val expectedCheckpoint = new Checkpoint(Map(systemStream -> "47")) - assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(partition)) + offsetManager.checkpoint(taskName) + val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47")) + assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName)) } @Test def testShouldResetStreams { + val taskName = new TaskName("c") val systemStream = new SystemStream("test-system", "test-stream") val partition = new Partition(0) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) val systemStreamMetadata = Map(systemStream -> testStreamMetadata) - val defaultOffsets = Map(systemStream -> OffsetType.OLDEST) - val checkpoint = new Checkpoint(Map(systemStream -> "45")) - val checkpointManager = getCheckpointManager(systemStreamPartition) + val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45")) + val checkpointManager = getCheckpointManager(systemStreamPartition, taskName) val config = new MapConfig(Map( "systems.test-system.samza.offset.default" -> "oldest", "systems.test-system.streams.test-stream.samza.reset.offset" -> "true")) val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager) - offsetManager.register(systemStreamPartition) + offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start assertTrue(checkpointManager.isStarted) assertEquals(1, checkpointManager.registered.size) - assertEquals(partition, checkpointManager.registered.head) - assertEquals(checkpoint, checkpointManager.readLastCheckpoint(partition)) + assertEquals(taskName, checkpointManager.registered.head) + assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName)) // Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset. assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get) } @@ -110,36 +113,38 @@ class TestOffsetManager { val systemStream = new SystemStream("test-system", "test-stream") val partition1 = new Partition(0) val partition2 = new Partition(1) + val taskName1 = new TaskName("P0") + val taskName2 = new TaskName("P1") val systemStreamPartition1 = new SystemStreamPartition(systemStream, partition1) val systemStreamPartition2 = new SystemStreamPartition(systemStream, partition2) val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map( partition1 -> new SystemStreamPartitionMetadata("0", "1", "2"), partition2 -> new SystemStreamPartitionMetadata("3", "4", "5"))) val systemStreamMetadata = Map(systemStream -> testStreamMetadata) - val defaultOffsets = Map(systemStream -> OffsetType.OLDEST) - val checkpoint = new Checkpoint(Map(systemStream -> "45")) + val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45")) // Checkpoint manager only has partition 1. - val checkpointManager = getCheckpointManager(systemStreamPartition1) + val checkpointManager = getCheckpointManager(systemStreamPartition1, taskName1) val systemAdmins = Map("test-system" -> getSystemAdmin) val config = new MapConfig val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins) // Register both partitions. Partition 2 shouldn't have a checkpoint. - offsetManager.register(systemStreamPartition1) - offsetManager.register(systemStreamPartition2) + offsetManager.register(taskName1, Set(systemStreamPartition1)) + offsetManager.register(taskName2, Set(systemStreamPartition2)) offsetManager.start assertTrue(checkpointManager.isStarted) assertEquals(2, checkpointManager.registered.size) - assertEquals(checkpoint, checkpointManager.readLastCheckpoint(partition1)) - assertNull(checkpointManager.readLastCheckpoint(partition2)) + assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName1)) + assertNull(checkpointManager.readLastCheckpoint(taskName2)) } @Test def testShouldFailWhenMissingMetadata { + val taskName = new TaskName("c") val systemStream = new SystemStream("test-system", "test-stream") val partition = new Partition(0) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val offsetManager = new OffsetManager - offsetManager.register(systemStreamPartition) + offsetManager.register(taskName, Set(systemStreamPartition)) intercept[SamzaException] { offsetManager.start @@ -148,13 +153,14 @@ class TestOffsetManager { @Ignore("OffsetManager.start is supposed to throw an exception - but it doesn't") @Test def testShouldFailWhenMissingDefault { + val taskName = new TaskName("c") val systemStream = new SystemStream("test-system", "test-stream") val partition = new Partition(0) val systemStreamPartition = new SystemStreamPartition(systemStream, partition) val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) val systemStreamMetadata = Map(systemStream -> testStreamMetadata) val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]())) - offsetManager.register(systemStreamPartition) + offsetManager.register(taskName, Set(systemStreamPartition)) intercept[SamzaException] { offsetManager.start @@ -190,6 +196,7 @@ class TestOffsetManager { @Test def testOutdatedStreamInCheckpoint { + val taskName = new TaskName("c") val systemStream0 = new SystemStream("test-system-0", "test-stream") val systemStream1 = new SystemStream("test-system-1", "test-stream") val partition0 = new Partition(0) @@ -200,26 +207,32 @@ class TestOffsetManager { val offsetSettings = Map(systemStream0 -> OffsetSetting(testStreamMetadata, OffsetType.UPCOMING, false)) val checkpointManager = getCheckpointManager(systemStreamPartition1) val offsetManager = new OffsetManager(offsetSettings, checkpointManager) - offsetManager.register(systemStreamPartition0) + offsetManager.register(taskName, Set(systemStreamPartition0)) offsetManager.start assertTrue(checkpointManager.isStarted) assertEquals(1, checkpointManager.registered.size) assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null)) } - private def getCheckpointManager(systemStreamPartition: SystemStreamPartition) = { - val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream -> "45")) + private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = { + val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45")) new CheckpointManager { var isStarted = false var isStopped = false - var registered = Set[Partition]() - var checkpoints = Map(systemStreamPartition.getPartition -> checkpoint) + var registered = Set[TaskName]() + var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint) + var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]() def start { isStarted = true } - def register(partition: Partition) { registered += partition } - def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { checkpoints += partition -> checkpoint } - def readLastCheckpoint(partition: Partition) = checkpoints.getOrElse(partition, null) + def register(taskName: TaskName) { registered += taskName } + def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint } + def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null) def stop { isStopped = true } + + override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping + + override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping + } } @@ -232,4 +245,4 @@ class TestOffsetManager { Map[String, SystemStreamMetadata]() } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala index 50d9a05..10ff1f4 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala @@ -23,34 +23,53 @@ import java.io.File import scala.collection.JavaConversions._ import java.util.Random import org.junit.Assert._ -import org.junit.Test +import org.junit.{After, Before, Test} import org.apache.samza.SamzaException import org.apache.samza.Partition import org.apache.samza.checkpoint.Checkpoint -import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.container.TaskName +import org.junit.rules.TemporaryFolder -class TestFileSystemCheckpointManager { - val checkpointRoot = System.getProperty("java.io.tmpdir") +class TestFileSystemCheckpointManager { + val checkpointRoot = System.getProperty("java.io.tmpdir") // TODO: Move this out of tmp, into our build dir + val taskName = new TaskName("Warwickshire") + val baseFileLocation = new File(checkpointRoot) + + val tempFolder = new TemporaryFolder + + @Before + def createTempFolder = tempFolder.create() + + @After + def deleteTempFolder = tempFolder.delete() @Test def testReadForCheckpointFileThatDoesNotExistShouldReturnNull { - val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot)) - assert(cpm.readLastCheckpoint(new Partition(1)) == null) + val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot) + assertNull(cpm.readLastCheckpoint(taskName)) } @Test def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint { - val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot)) - val partition = new Partition(2) val cp = new Checkpoint(Map( - new SystemStream("a", "b") -> "c", - new SystemStream("a", "c") -> "d", - new SystemStream("b", "d") -> "e")) + new SystemStreamPartition("a", "b", new Partition(0)) -> "c", + new SystemStreamPartition("a", "c", new Partition(1)) -> "d", + new SystemStreamPartition("b", "d", new Partition(2)) -> "e")) + + var readCp:Checkpoint = null + val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot) + cpm.start - cpm.writeCheckpoint(partition, cp) - val readCp = cpm.readLastCheckpoint(partition) + cpm.writeCheckpoint(taskName, cp) + readCp = cpm.readLastCheckpoint(taskName) cpm.stop - assert(readCp.equals(cp)) + + assertNotNull(readCp) + cp.equals(readCp) + assertEquals(cp.getOffsets.keySet(), readCp.getOffsets.keySet()) + assertEquals(cp.getOffsets, readCp.getOffsets) + assertEquals(cp, readCp) } @Test http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala new file mode 100644 index 0000000..3032b00 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container + +import org.apache.samza.Partition +import org.apache.samza.system.SystemStreamPartition +import org.junit.Test +import java.util.HashSet +import java.util.Map +import java.util.Set +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import java.util.Collections + +object SystemStreamPartitionGrouperTestBase { + val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)) + val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)) + val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)) + val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1)) + val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2)) + val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)) + val allSSPs = new HashSet[SystemStreamPartition] + Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0) +} + +abstract class SystemStreamPartitionGrouperTestBase { + def getGrouper: SystemStreamPartitionGrouper + + @Test + def emptySetReturnsEmptyMap { + val grouper: SystemStreamPartitionGrouper = getGrouper + val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new HashSet[SystemStreamPartition]) + assertTrue(result.isEmpty) + } + + def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: Map[TaskName, Set[SystemStreamPartition]]) { + val grouper: SystemStreamPartitionGrouper = getGrouper + val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input) + assertEquals(output, result) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index fa10231..d4ceffc 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -37,29 +37,45 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche val p0 = new Partition(0) val p1 = new Partition(1) + val taskName0 = new TaskName(p0.toString) + val taskName1 = new TaskName(p1.toString) val ssp0 = new SystemStreamPartition("testSystem", "testStream", p0) val ssp1 = new SystemStreamPartition("testSystem", "testStream", p1) val envelope0 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0") val envelope1 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1") + def getMockTaskInstances: Map[TaskName, TaskInstance] = { + val ti0 = mock[TaskInstance] + when(ti0.systemStreamPartitions).thenReturn(Set(ssp0)) + when(ti0.taskName).thenReturn(taskName0) + + val ti1 = mock[TaskInstance] + when(ti1.systemStreamPartitions).thenReturn(Set(ssp1)) + when(ti1.taskName).thenReturn(taskName1) + + Map(taskName0 -> ti0, taskName1 -> ti1) + } + @Test def testProcessMessageFromChooser { - val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics) when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new StopRunLoop) intercept[StopRunLoop] { runLoop.run } - verify(taskInstances(p0)).process(Matchers.eq(envelope0), anyObject) - verify(taskInstances(p1)).process(Matchers.eq(envelope1), anyObject) + verify(taskInstances(taskName0)).process(Matchers.eq(envelope0), anyObject) + verify(taskInstances(taskName1)).process(Matchers.eq(envelope1), anyObject) runLoop.metrics.envelopes.getCount should equal(2L) runLoop.metrics.nullEnvelopes.getCount should equal(0L) } + @Test def testNullMessageFromChooser { val consumers = mock[SystemConsumers] - val runLoop = new RunLoop(Map(p0 -> mock[TaskInstance]), consumers, new SamzaContainerMetrics) + val map = getMockTaskInstances - taskName1 // This test only needs p0 + val runLoop = new RunLoop(map, consumers, new SamzaContainerMetrics) when(consumers.choose).thenReturn(null).thenReturn(null).thenThrow(new StopRunLoop) intercept[StopRunLoop] { runLoop.run } runLoop.metrics.envelopes.getCount should equal(0L) @@ -73,7 +89,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche when(consumers.choose).thenReturn(envelope0) val runLoop = new RunLoop( - taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]), + taskInstances = getMockTaskInstances, consumerMultiplexer = consumers, metrics = new SamzaContainerMetrics, windowMs = 60000, // call window once per minute @@ -86,67 +102,67 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche intercept[StopRunLoop] { runLoop.run } - verify(runLoop.taskInstances(p0), times(5)).window(anyObject) - verify(runLoop.taskInstances(p1), times(5)).window(anyObject) - verify(runLoop.taskInstances(p0), times(10)).commit - verify(runLoop.taskInstances(p1), times(10)).commit + verify(runLoop.taskInstances(taskName0), times(5)).window(anyObject) + verify(runLoop.taskInstances(taskName1), times(5)).window(anyObject) + verify(runLoop.taskInstances(taskName0), times(10)).commit + verify(runLoop.taskInstances(taskName1), times(10)).commit } @Test def testCommitCurrentTaskManually { - val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new StopRunLoop) - stubProcess(taskInstances(p0), (envelope, coordinator) => coordinator.commit(RequestScope.CURRENT_TASK)) + stubProcess(taskInstances(taskName0), (envelope, coordinator) => coordinator.commit(RequestScope.CURRENT_TASK)) intercept[StopRunLoop] { runLoop.run } - verify(taskInstances(p0), times(1)).commit - verify(taskInstances(p1), times(0)).commit + verify(taskInstances(taskName0), times(1)).commit + verify(taskInstances(taskName1), times(0)).commit } @Test def testCommitAllTasksManually { - val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) when(consumers.choose).thenReturn(envelope0).thenThrow(new StopRunLoop) - stubProcess(taskInstances(p0), (envelope, coordinator) => coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER)) + stubProcess(taskInstances(taskName0), (envelope, coordinator) => coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER)) intercept[StopRunLoop] { runLoop.run } - verify(taskInstances(p0), times(1)).commit - verify(taskInstances(p1), times(1)).commit + verify(taskInstances(taskName0), times(1)).commit + verify(taskInstances(taskName1), times(1)).commit } @Test def testShutdownOnConsensus { - val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) when(consumers.choose).thenReturn(envelope0).thenReturn(envelope0).thenReturn(envelope1) - stubProcess(taskInstances(p0), (envelope, coordinator) => coordinator.shutdown(RequestScope.CURRENT_TASK)) - stubProcess(taskInstances(p1), (envelope, coordinator) => coordinator.shutdown(RequestScope.CURRENT_TASK)) + stubProcess(taskInstances(taskName0), (envelope, coordinator) => coordinator.shutdown(RequestScope.CURRENT_TASK)) + stubProcess(taskInstances(taskName1), (envelope, coordinator) => coordinator.shutdown(RequestScope.CURRENT_TASK)) runLoop.run - verify(taskInstances(p0), times(2)).process(Matchers.eq(envelope0), anyObject) - verify(taskInstances(p1), times(1)).process(Matchers.eq(envelope1), anyObject) + verify(taskInstances(taskName0), times(2)).process(Matchers.eq(envelope0), anyObject) + verify(taskInstances(taskName1), times(1)).process(Matchers.eq(envelope1), anyObject) } @Test def testShutdownNow { - val taskInstances = Map(p0 -> mock[TaskInstance], p1 -> mock[TaskInstance]) + val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) when(consumers.choose).thenReturn(envelope0).thenReturn(envelope1) - stubProcess(taskInstances(p0), (envelope, coordinator) => coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)) + stubProcess(taskInstances(taskName0), (envelope, coordinator) => coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)) runLoop.run - verify(taskInstances(p0), times(1)).process(anyObject, anyObject) - verify(taskInstances(p1), times(0)).process(anyObject, anyObject) + verify(taskInstances(taskName0), times(1)).process(anyObject, anyObject) + verify(taskInstances(taskName1), times(0)).process(anyObject, anyObject) } def anyObject[T] = Matchers.anyObject.asInstanceOf[T] http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 190bdfe..e3c7fe3 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -19,13 +19,11 @@ package org.apache.samza.container -import java.io.File import org.apache.samza.config.Config import org.junit.Assert._ import org.junit.Test import org.apache.samza.Partition import org.apache.samza.config.MapConfig -import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemConsumers import org.apache.samza.system.chooser.RoundRobinChooser import org.apache.samza.system.SystemConsumer @@ -81,8 +79,7 @@ class TestSamzaContainer { } } val config = new MapConfig - val partition = new Partition(0) - val containerName = "test-container" + val taskName = new TaskName("taskName") val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -91,18 +88,18 @@ class TestSamzaContainer { new SerdeManager) val taskInstance: TaskInstance = new TaskInstance( task, - partition, + taskName, config, new TaskInstanceMetrics, consumerMultiplexer: SystemConsumers, producerMultiplexer: SystemProducers) val runLoop = new RunLoop( - taskInstances = Map(partition -> taskInstance), + taskInstances = Map(taskName -> taskInstance), consumerMultiplexer = consumerMultiplexer, metrics = new SamzaContainerMetrics ) val container = new SamzaContainer( - Map(partition -> taskInstance), + Map(taskName -> taskInstance), runLoop, consumerMultiplexer, producerMultiplexer, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 1f5e3bb..9d5ff13 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -50,7 +50,6 @@ class TestTaskInstance { } val config = new MapConfig val partition = new Partition(0) - val containerName = "test-container" val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -62,16 +61,17 @@ class TestTaskInstance { // Pretend our last checkpointed (next) offset was 2. val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) + val taskName = new TaskName("taskName") val taskInstance: TaskInstance = new TaskInstance( task, - partition, + taskName, config, new TaskInstanceMetrics, consumerMultiplexer, producerMultiplexer, offsetManager) // Pretend we got a message with offset 2 and next offset 3. - val coordinator = new ReadableCoordinator(partition) + val coordinator = new ReadableCoordinator(taskName) taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator) // Check to see if the offset manager has been properly updated with offset 3. val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala new file mode 100644 index 0000000..d680b20 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container + +import org.junit.Test +import org.junit.Assert._ +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.{SamzaException, Partition} + +class TestTaskNamesToSystemStreamPartitions { + var sspCounter = 0 + def makeSSP(stream:String) = new SystemStreamPartition("system", stream, new Partition(42)) + + @Test + def toSetWorksCorrectly() { + val map = Map(new TaskName("tn1") -> Set(makeSSP("tn1-1"), makeSSP("tn1-2")), + new TaskName("tn2") -> Set(makeSSP("tn2-1"), makeSSP("tn2-2"))) + val tntssp = TaskNamesToSystemStreamPartitions(map) + + val asSet = tntssp.toSet + val expected = Set(new TaskName("tn1") -> Set(makeSSP("tn1-1"), makeSSP("tn1-2")), + (new TaskName("tn2") -> Set(makeSSP("tn2-1"), makeSSP("tn2-2")))) + assertEquals(expected , asSet) + } + + @Test + def validateMethodCatchesDuplicatedSSPs() { + val duplicatedSSP1 = new SystemStreamPartition("sys", "str", new Partition(42)) + val duplicatedSSP2 = new SystemStreamPartition("sys", "str", new Partition(42)) + val notDuplicatedSSP1 = new SystemStreamPartition("sys", "str2", new Partition(42)) + val notDuplicatedSSP2 = new SystemStreamPartition("sys", "str3", new Partition(42)) + + val badMapping = Map(new TaskName("a") -> Set(notDuplicatedSSP1, duplicatedSSP1), new TaskName("b") -> Set(notDuplicatedSSP2, duplicatedSSP2)) + + var caughtException = false + try { + TaskNamesToSystemStreamPartitions(badMapping) + } catch { + case se: SamzaException => assertEquals("Assigning the same SystemStreamPartition to multiple " + + "TaskNames is not currently supported. Out of compliance SystemStreamPartitions and counts: " + + "Map(SystemStreamPartition [sys, str, 42] -> 2)", se.getMessage) + caughtException = true + case _: Throwable => + } + assertTrue("TaskNamesToSystemStreamPartitions should have rejected this mapping but didn't", caughtException) + } + + @Test + def validateMethodAllowsUniqueSSPs() { + val sspSet1 = (0 to 10).map(p => new SystemStreamPartition("sys", "str", new Partition(p))).toSet + val sspSet2 = (0 to 10).map(p => new SystemStreamPartition("sys", "str2", new Partition(p))).toSet + + TaskNamesToSystemStreamPartitions(Map(new TaskName("set1") -> sspSet1, new TaskName("set2") -> sspSet2)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala new file mode 100644 index 0000000..733be20 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.systemstreampartition.groupers + +import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper} +import scala.collection.JavaConverters._ +import org.junit.Test + +class TestGroupByPartition extends SystemStreamPartitionGrouperTestBase { + import SystemStreamPartitionGrouperTestBase._ + + val expected /* from base class provided set */ = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava, + new TaskName("Partition 1") -> Set(aa1, ab1).asJava, + new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava + + override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition + + @Test def groupingWorks() { + verifyGroupGroupsCorrectly(allSSPs, expected) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala new file mode 100644 index 0000000..e9c15a5 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.systemstreampartition.groupers + +import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper} +import scala.collection.JavaConverters._ +import org.junit.Test + +class TestGroupBySystemStreamPartition extends SystemStreamPartitionGrouperTestBase { + import SystemStreamPartitionGrouperTestBase._ + + // Building manually to avoid just duplicating a logic potential logic error here and there + val expected /* from base class provided set */ = Map(new TaskName(aa0.toString) -> Set(aa0).asJava, + new TaskName(aa1.toString) -> Set(aa1).asJava, + new TaskName(aa2.toString) -> Set(aa2).asJava, + new TaskName(ab1.toString) -> Set(ab1).asJava, + new TaskName(ab2.toString) -> Set(ab2).asJava, + new TaskName(ac0.toString) -> Set(ac0).asJava).asJava + + override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition + + @Test def groupingWorks() { + verifyGroupGroupsCorrectly(allSSPs, expected) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala new file mode 100644 index 0000000..7ea09cd --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.systemstreampartition.taskname.groupers + +import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} +import org.apache.samza.system.SystemStreamPartition +import org.junit.Assert._ +import org.junit.Test + +class TestSimpleSystemStreamPartitionTaskNameGrouper { + val emptySSPSet = Set[SystemStreamPartition]() + + @Test + def weGetAsExactlyManyGroupsAsWeAskFor() { + // memoize the maps used in the test to avoid an O(n^3) loop + val tntsspCache = scala.collection.mutable.Map[Int, TaskNamesToSystemStreamPartitions]() + + def tntsspOfSize(size:Int) = { + def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap) + + tntsspCache.getOrElseUpdate(size, getMap(size)) + } + + val maxTNTSSPSize = 1000 + val maxNumGroups = 140 + for(numGroups <- 1 to maxNumGroups) { + val grouper = new SimpleSystemStreamPartitionTaskNameGrouper(numGroups) + + for (tntsspSize <- numGroups to maxTNTSSPSize) { + val map = tntsspOfSize(tntsspSize) + assertEquals(tntsspSize, map.size) + + val grouped = grouper.groupTaskNames(map) + assertEquals("Asked for " + numGroups + " but got " + grouped.size, numGroups, grouped.size) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala index 21d8a78..258ccc1 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala @@ -20,7 +20,6 @@ package org.apache.samza.job import java.io.File import org.apache.samza.config.Config -import org.junit.Assert._ import org.junit.Test object TestJobRunner {
