http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala new file mode 100644 index 0000000..f8a535a --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job + +import org.junit.Test +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.Partition +import org.apache.samza.util.Util._ +import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} +import org.junit.Assert._ + +class TestShellCommandBuilder { + + @Test + def testJsonCreateStreamPartitionStringRoundTrip() { + val getPartitions: Set[SystemStreamPartition] = { + // Build a heavily skewed set of partitions. + def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet + val system = "all-same-system." + val lotsOfParts = Map(system + "topic-with-many-parts-a" -> partitionSet(128), + system + "topic-with-many-parts-b" -> partitionSet(128), system + "topic-with-many-parts-c" -> partitionSet(64)) + val fewParts = ('c' to 'z').map(l => system + l.toString -> partitionSet(4)).toMap + val streamsMap = (lotsOfParts ++ fewParts) + (for(s <- streamsMap.keys; + part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet + } + + // Group by partition... + val sspTaskNameMap = TaskNamesToSystemStreamPartitions(getPartitions.groupBy(p => new TaskName(p.getPartition.toString)).toMap) + + val asString = ShellCommandBuilder.serializeSystemStreamPartitionSetToJSON(sspTaskNameMap.getJavaFriendlyType) + + val backFromSSPTaskNameMap = TaskNamesToSystemStreamPartitions(ShellCommandBuilder.deserializeSystemStreamPartitionSetFromJSON(asString)) + assertEquals(sspTaskNameMap, backFromSSPTaskNameMap) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala index 4f7ddcd..d425e86 100644 --- a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala +++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala @@ -21,7 +21,6 @@ package org.apache.samza.metrics import org.junit.Assert._ import org.junit.Test -import org.apache.samza.config.MapConfig import grizzled.slf4j.Logging import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL} import java.io.IOException http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala index 70d8c80..0d07314 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala @@ -19,40 +19,40 @@ package org.apache.samza.serializers +import java.util +import org.apache.samza.Partition +import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.container.TaskName +import org.apache.samza.system.SystemStreamPartition import org.junit.Assert._ import org.junit.Test -import org.apache.samza.system.SystemStream -import org.apache.samza.checkpoint.Checkpoint import scala.collection.JavaConversions._ -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.SamzaException -import org.apache.samza.Partition class TestCheckpointSerde { @Test def testExactlyOneOffset { val serde = new CheckpointSerde - var offsets = Map[SystemStream, String]() - val systemStream = new SystemStream("test-system", "test-stream") - offsets += systemStream -> "1" + var offsets = Map[SystemStreamPartition, String]() + val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(777)) + offsets += systemStreamPartition -> "1" val deserializedOffsets = serde.fromBytes(serde.toBytes(new Checkpoint(offsets))) - assertEquals("1", deserializedOffsets.getOffsets.get(systemStream)) + assertEquals("1", deserializedOffsets.getOffsets.get(systemStreamPartition)) assertEquals(1, deserializedOffsets.getOffsets.size) } @Test - def testMoreThanOneOffsetShouldFail { - val serde = new CheckpointSerde - var offsets = Map[SystemStream, String]() - // Since SS != SSP with same system and stream name, this should result in - // two offsets for one system stream in the serde. - offsets += new SystemStream("test-system", "test-stream") -> "1" - offsets += new SystemStreamPartition("test-system", "test-stream", new Partition(0)) -> "2" - try { - serde.toBytes(new Checkpoint(offsets)) - fail("Expected to fail with more than one offset for a single SystemStream.") - } catch { - case e: SamzaException => // expected this - } + def testChangelogPartitionMappingRoundTrip { + val mapping = new util.HashMap[TaskName, java.lang.Integer]() + mapping.put(new TaskName("Ted"), 0) + mapping.put(new TaskName("Dougal"), 1) + mapping.put(new TaskName("Jack"), 2) + + val checkpointSerde = new CheckpointSerde + val asBytes = checkpointSerde.changelogPartitionMappingToBytes(mapping) + val backToMap = checkpointSerde.changelogPartitionMappingFromBytes(asBytes) + + assertEquals(mapping, backToMap) + assertNotSame(mapping, backToMap) } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala index d31c3ce..f505eb1 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala @@ -19,17 +19,17 @@ package org.apache.samza.system.filereader -import org.junit.Test -import org.junit.Assert._ -import org.apache.samza.system.SystemStreamPartition -import org.junit.AfterClass -import java.io.PrintWriter import java.io.File +import java.io.FileWriter +import java.io.PrintWriter import org.apache.samza.Partition +import org.apache.samza.system.SystemStreamPartition +import org.junit.AfterClass +import org.junit.Assert._ +import org.junit.BeforeClass +import org.junit.Test import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap -import org.junit.BeforeClass -import java.io.FileWriter object TestFileReaderSystemConsumer { val consumer = new FileReaderSystemConsumer("file-reader", null) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala index 12f1e03..7cfeb5a 100644 --- a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala @@ -21,13 +21,15 @@ package org.apache.samza.task import org.junit.Assert._ import org.junit.Test -import org.apache.samza.Partition import org.apache.samza.task.TaskCoordinator.RequestScope +import org.apache.samza.container.TaskName class TestReadableCoordinator { + val taskName = new TaskName("P0") + @Test def testCommitTask { - val coord = new ReadableCoordinator(new Partition(0)) + val coord = new ReadableCoordinator(taskName) assertFalse(coord.requestedCommitTask) assertFalse(coord.requestedCommitAll) coord.commit(RequestScope.CURRENT_TASK) @@ -37,7 +39,7 @@ class TestReadableCoordinator { @Test def testCommitAll { - val coord = new ReadableCoordinator(new Partition(0)) + val coord = new ReadableCoordinator(taskName) assertFalse(coord.requestedCommitTask) assertFalse(coord.requestedCommitAll) coord.commit(RequestScope.ALL_TASKS_IN_CONTAINER) @@ -47,7 +49,7 @@ class TestReadableCoordinator { @Test def testShutdownNow { - val coord = new ReadableCoordinator(new Partition(0)) + val coord = new ReadableCoordinator(taskName) assertFalse(coord.requestedShutdownOnConsensus) assertFalse(coord.requestedShutdownNow) coord.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER) @@ -57,7 +59,7 @@ class TestReadableCoordinator { @Test def testShutdownRequest { - val coord = new ReadableCoordinator(new Partition(0)) + val coord = new ReadableCoordinator(taskName) assertFalse(coord.requestedShutdownOnConsensus) assertFalse(coord.requestedShutdownNow) coord.shutdown(RequestScope.CURRENT_TASK) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index ad6d2da..7c314ce 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -20,9 +20,13 @@ package org.apache.samza.util import org.apache.samza.Partition import org.apache.samza.config.Config +import org.apache.samza.config.Config import org.apache.samza.config.MapConfig +import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} +import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.SystemFactory +import org.apache.samza.system.SystemFactory import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.Util._ import org.junit.Assert._ @@ -54,48 +58,32 @@ class TestUtil { } @Test - def testGetTopicPartitionsForTask() { - def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet - - val taskCount = 4 - val streamsMap = Map("kafka.a" -> partitionSet(4), "kafka.b" -> partitionSet(18), "timestream.c" -> partitionSet(24)) - val streamsAndParts = (for(s <- streamsMap.keys; - part <- streamsMap.getOrElse(s, Set.empty)) - yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet + def testResolveTaskNameToChangelogPartitionMapping { + def testRunner(description:String, currentTaskNames:Set[TaskName], previousTaskNameMapping:Map[TaskName, Int], + result:Map[TaskName, Int]) { + assertEquals("Failed: " + description, result, + Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, previousTaskNameMapping)) + } - for(i <- 0 until taskCount) { - val result: Set[SystemStreamPartition] = Util.getStreamsAndPartitionsForContainer(i, taskCount, streamsAndParts) - // b -> 18 % 4 = 2 therefore first two results should have an extra element - if(i < 2) { - assertEquals(12, result.size) - } else { - assertEquals(11, result.size) - } + testRunner("No change between runs", + Set(new TaskName("Partition 0")), + Map(new TaskName("Partition 0") -> 0), + Map(new TaskName("Partition 0") -> 0)) - result.foreach(r => assertEquals(i, r.getPartition.getPartitionId % taskCount)) - } - } - - @Test - def testJsonCreateStreamPartitionStringRoundTrip() { - val getPartitions: Set[SystemStreamPartition] = { - // Build a heavily skewed set of partitions. - def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet - val system = "all-same-system." - val lotsOfParts = Map(system + "topic-with-many-parts-a" -> partitionSet(128), - system + "topic-with-many-parts-b" -> partitionSet(128), system + "topic-with-many-parts-c" -> partitionSet(64)) - val fewParts = ('c' to 'z').map(l => system + l.toString -> partitionSet(4)).toMap - val streamsMap = (lotsOfParts ++ fewParts) - (for(s <- streamsMap.keys; - part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet - } + testRunner("New TaskName added, none missing this run", + Set(new TaskName("Partition 0"), new TaskName("Partition 1")), + Map(new TaskName("Partition 0") -> 0), + Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1)) - val streamsAndParts: Set[SystemStreamPartition] = getStreamsAndPartitionsForContainer(0, 4, getPartitions).toSet - println(streamsAndParts) - val asString = serializeSSPSetToJSON(streamsAndParts) + testRunner("New TaskName added, one missing this run", + Set(new TaskName("Partition 0"), new TaskName("Partition 2")), + Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1), + Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2)) - val backToStreamsAndParts = deserializeSSPSetFromJSON(asString) - assertEquals(streamsAndParts, backToStreamsAndParts) + testRunner("New TaskName added, all previous missing this run", + Set(new TaskName("Partition 3"), new TaskName("Partition 4")), + Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2), + Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2, new TaskName("Partition 3") -> 3, new TaskName("Partition 4") -> 4)) } /** http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala new file mode 100644 index 0000000..5d8ee4f --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.checkpoint.kafka + +import java.util +import org.apache.samza.SamzaException +import org.apache.samza.container.TaskName +import org.codehaus.jackson.`type`.TypeReference +import org.codehaus.jackson.map.ObjectMapper +import scala.collection.JavaConversions._ + +/** + * Kafka Checkpoint Log-specific key used to identify what type of entry is + * written for any particular log entry. + * + * @param map Backing map to hold key values + */ +class KafkaCheckpointLogKey private (val map: Map[String, String]) { + // This might be better as a case class... + import KafkaCheckpointLogKey._ + + /** + * Serialize this key to bytes + * @return Key as bytes + */ + def toBytes(): Array[Byte] = { + val jMap = new util.HashMap[String, String](map.size) + jMap.putAll(map) + + JSON_MAPPER.writeValueAsBytes(jMap) + } + + private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY + " in map for Kafka Checkpoint log key")) + + /** + * Is this key for a checkpoint entry? + * + * @return true iff this key's entry is for a checkpoint + */ + def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE) + + /** + * Is this key for a changelog partition mapping? + * + * @return true iff this key's entry is for a changelog partition mapping + */ + def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE) + + /** + * If this Key is for a checkpoint entry, return its associated TaskName. + * + * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry + */ + def getCheckpointTaskName = { + val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this)) + new TaskName(asString) + } + + def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey] + + override def equals(other: Any): Boolean = other match { + case that: KafkaCheckpointLogKey => + (that canEqual this) && + map == that.map + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(map) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +object KafkaCheckpointLogKey { + /** + * Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's + * type, either a checkpoint or a changelog-partition-mapping. + */ + val CHECKPOINT_KEY_KEY = "type" + val CHECKPOINT_KEY_TYPE = "checkpoint" + val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping" + val CHECKPOINT_TASKNAME_KEY = "taskName" + val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory" + + /** + * Partition mapping keys have no dynamic values, so we just need one instance. + */ + val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE)) + + private val JSON_MAPPER = new ObjectMapper() + val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {} + + var systemStreamPartitionGrouperFactoryString:Option[String] = None + + /** + * Set the name of the factory configured to provide the SystemStreamPartition grouping + * so it be included in the key. + * + * @param str Config value of SystemStreamPartition Grouper Factory + */ + def setSystemStreamPartitionGrouperFactoryString(str:String) = { + systemStreamPartitionGrouperFactoryString = Some(str) + } + + /** + * Get the name of the factory configured to provide the SystemStreamPartition grouping + * so it be included in the key + */ + def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set.")) + + /** + * Build a key for a a checkpoint log entry for a particular TaskName + * @param taskName TaskName to build for this checkpoint entry + * + * @return Key for checkpoint log entry + */ + def getCheckpointKey(taskName:TaskName) = { + val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE, + CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName, + SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString) + + new KafkaCheckpointLogKey(map) + } + + /** + * Build a key for a changelog partition mapping entry + * + * @return Key for changelog partition mapping entry + */ + def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY + + /** + * Deserialize a Kafka checkpoint log key + * @param bytes Serialized (via JSON) Kafka checkpoint log key + * @return Checkpoint log key + */ + def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = { + try { + val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE) + + if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) { + throw new SamzaException("No type entry in checkpoint key: " + jmap) + } + + // Only checkpoint keys have ssp grouper factory keys + if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) { + val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY) + + if (sspGrouperFactory == null) { + throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap) + } + + if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) { + throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString) + } + } + + new KafkaCheckpointLogKey(jmap.toMap) + } catch { + case e: Exception => + throw new SamzaException("Exception while deserializing checkpoint key", e) + } + } +} + +class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException { + override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey + + ") does not match value from current configuration (" + inConfig + "). " + + "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported." +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 15245d4..fff62e4 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -19,76 +19,112 @@ package org.apache.samza.checkpoint.kafka -import org.I0Itec.zkclient.ZkClient import grizzled.slf4j.Logging +import java.nio.ByteBuffer +import java.util import kafka.admin.AdminUtils -import kafka.api.FetchRequestBuilder -import kafka.api.OffsetRequest -import kafka.api.PartitionOffsetRequestInfo +import kafka.api._ import kafka.common.ErrorMapping +import kafka.common.InvalidMessageSizeException import kafka.common.TopicAndPartition import kafka.common.TopicExistsException -import kafka.common.InvalidMessageSizeException import kafka.common.UnknownTopicOrPartitionException import kafka.consumer.SimpleConsumer +import kafka.message.InvalidMessageException import kafka.producer.KeyedMessage -import kafka.producer.Partitioner import kafka.producer.Producer -import kafka.serializer.Decoder -import kafka.serializer.Encoder import kafka.utils.Utils -import kafka.utils.VerifiableProperties -import kafka.message.InvalidMessageException -import org.apache.samza.Partition +import org.I0Itec.zkclient.ZkClient import org.apache.samza.SamzaException import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.checkpoint.CheckpointManager +import org.apache.samza.container.TaskName import org.apache.samza.serializers.CheckpointSerde -import org.apache.samza.serializers.Serde import org.apache.samza.system.kafka.TopicMetadataCache -import org.apache.samza.util.TopicMetadataStore import org.apache.samza.util.ExponentialSleepStrategy +import org.apache.samza.util.TopicMetadataStore +import scala.collection.mutable /** - * Kafka checkpoint manager is used to store checkpoints in a Kafka topic that - * is uniquely identified by a job/partition combination. To read a checkpoint - * for a given job and partition combination (e.g. my-job, partition 1), we - * simply read the last message from the topic: __samza_checkpoint_my-job_1. If - * the topic does not yet exist, we assume that there is not yet any state for - * this job/partition pair, and return an empty checkpoint. + * Kafka checkpoint manager is used to store checkpoints in a Kafka topic. + * To read a checkpoint for a specific taskName, we find the newest message + * keyed to that taskName. If there is no such message, no checkpoint data + * exists. The underlying log has a single partition into which all + * checkpoints and TaskName to changelog partition mappings are written. */ class KafkaCheckpointManager( clientId: String, checkpointTopic: String, systemName: String, - totalPartitions: Int, replicationFactor: Int, socketTimeout: Int, bufferSize: Int, fetchSize: Int, metadataStore: TopicMetadataStore, - connectProducer: () => Producer[Partition, Array[Byte]], + connectProducer: () => Producer[Array[Byte], Array[Byte]], connectZk: () => ZkClient, + systemStreamPartitionGrouperFactoryString: String, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, - serde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging { + serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager with Logging { + import KafkaCheckpointManager._ + + var taskNames = Set[TaskName]() + var producer: Producer[Array[Byte], Array[Byte]] = null + var taskNamesToOffsets: Map[TaskName, Checkpoint] = null - var partitions = Set[Partition]() - var producer: Producer[Partition, Array[Byte]] = null + var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint - info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format (clientId, checkpointTopic, systemName)) + KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString) - def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { + info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName)) + + /** + * Write Checkpoint for specified taskName to log + * + * @param taskName Specific Samza taskName of which to write a checkpoint of. + * @param checkpoint Reference to a Checkpoint object to store offset data in. + **/ + override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { + val key = KafkaCheckpointLogKey.getCheckpointKey(taskName) + val keyBytes = key.toBytes() + val msgBytes = serde.toBytes(checkpoint) + + writeLog(CHECKPOINT_LOG4J_ENTRY, keyBytes, msgBytes) + } + + /** + * Write the taskName to partition mapping that is being maintained by this CheckpointManager + * + * @param changelogPartitionMapping Each TaskName's partition within the changelog + */ + override def writeChangeLogPartitionMapping(changelogPartitionMapping: util.Map[TaskName, java.lang.Integer]) { + val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey() + val keyBytes = key.toBytes() + val msgBytes = serde.changelogPartitionMappingToBytes(changelogPartitionMapping) + + writeLog(CHANGELOG_PARTITION_MAPPING_LOG4j, keyBytes, msgBytes) + } + + /** + * Common code for writing either checkpoints or changelog-partition-mappings to the log + * + * @param logType Type of entry that is being written, for logging + * @param key pre-serialized key for message + * @param msg pre-serialized message to write to log + */ + private def writeLog(logType:String, key: Array[Byte], msg: Array[Byte]) { retryBackoff.run( loop => { if (producer == null) { producer = connectProducer() } - producer.send(new KeyedMessage(checkpointTopic, null, partition, serde.toBytes(checkpoint))) + + producer.send(new KeyedMessage(checkpointTopic, key, 0, msg)) loop.done }, (exception, loop) => { - warn("Failed to send checkpoint %s for partition %s: %s. Retrying." format (checkpoint, partition, exception)) + warn("Failed to write %s partition entry %s: %s. Retrying." format(logType, key, exception)) debug("Exception detail:", exception) if (producer != null) { producer.close @@ -98,124 +134,219 @@ class KafkaCheckpointManager( ) } - def readLastCheckpoint(partition: Partition): Checkpoint = { - info("Reading checkpoint for partition %s." format partition.getPartitionId) + private def getConsumer(): SimpleConsumer = { + val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) + val metadata = metadataMap(checkpointTopic) + val partitionMetadata = metadata.partitionsMetadata + .filter(_.partitionId == 0) + .headOption + .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka.")) + val leader = partitionMetadata + .leader + .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic)) + + info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic)) + + new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId) + } + + private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1) + + private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = { + val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))) + .partitionErrorAndOffsets + .get(topicAndPartition) + .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic)) + // Fail or retry if there was an an issue with the offset request. + ErrorMapping.maybeThrowException(offsetResponse.error) + + val offset: Long = offsetResponse + .offsets + .headOption + .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic)) + + offset + } + + /** + * Read the last checkpoint for specified TaskName + * + * @param taskName Specific Samza taskName for which to get the last checkpoint of. + **/ + override def readLastCheckpoint(taskName: TaskName): Checkpoint = { + if (!taskNames.contains(taskName)) { + throw new SamzaException(taskName + " not registered with this CheckpointManager") + } + + info("Reading checkpoint for taskName " + taskName) + + if (taskNamesToOffsets == null) { + info("No TaskName to checkpoint mapping provided. Reading for first time.") + taskNamesToOffsets = readCheckpointsFromLog() + } else { + info("Already existing checkpoint mapping. Merging new offsets") + taskNamesToOffsets ++= readCheckpointsFromLog() + } + + val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null) + + info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint)) + + checkpoint + } + + /** + * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints + */ + private def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = { + val checkpoints = mutable.Map[TaskName, Checkpoint]() + + def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey + + def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = { + val taskName = checkpointKey.getCheckpointTaskName + + if (taskNames.contains(taskName)) { + val checkpoint = serde.fromBytes(Utils.readBytes(payload)) + + debug("Adding checkpoint " + checkpoint + " for taskName " + taskName) + + checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go + } + } + + readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint) + + checkpoints.toMap /* of the immutable kind */ + } + + /** + * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping + * + * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based + * checkpoint log reading + */ + override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = { + var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]() + + def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping + + def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = { + changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload)) + + debug("Adding changelog partition mapping" + changelogPartitionMapping) + } - val checkpoint = retryBackoff.run( + readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint) + + changelogPartitionMapping + } + + /** + * Common code for reading both changelog partition mapping and change log + * + * @param entryType What type of entry to look for within the log key's + * @param handleEntry Code to handle an entry in the log once it's found + */ + private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean, + handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = { + retryBackoff.run[Unit]( loop => { - // Assume checkpoint topic exists with correct partitions, since it should be verified on start. - // Fetch the metadata for this checkpoint topic/partition pair. - val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) - val metadata = metadataMap(checkpointTopic) - val partitionMetadata = metadata.partitionsMetadata - .filter(_.partitionId == partition.getPartitionId) - .headOption - .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition %d, but it didn't exist in Kafka." format partition.getPartitionId)) - val partitionId = partitionMetadata.partitionId - val leader = partitionMetadata - .leader - .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic)) - - info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, checkpointTopic, partitionId)) - - val consumer = new SimpleConsumer( - leader.host, - leader.port, - socketTimeout, - bufferSize, - clientId) + val consumer = getConsumer() + + val topicAndPartition = new TopicAndPartition(checkpointTopic, 0) + try { - val topicAndPartition = new TopicAndPartition(checkpointTopic, partitionId) - val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))) - .partitionErrorAndOffsets - .get(topicAndPartition) - .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (checkpointTopic, partitionId))) + var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition)) - // Fail or retry if there was an an issue with the offset request. - ErrorMapping.maybeThrowException(offsetResponse.error) + info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType)) - val offset = offsetResponse - .offsets - .headOption - .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (checkpointTopic, partitionId))) + val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime) - info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, checkpointTopic, partitionId)) + info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic)) - if (offset <= 0) { - info("Got offset 0 (no messages in checkpoint topic) for topic %s and partition %s, so returning null. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (checkpointTopic, partition)) - return null + if (offset < 0) { + info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic)) + return } - val request = new FetchRequestBuilder() - // Kafka returns 1 greater than the offset of the last message in - // the topic, so subtract one to fetch the last message. - .addFetch(checkpointTopic, partitionId, offset - 1, fetchSize) - .maxWait(500) - .minBytes(1) - .clientId(clientId) - .build - val messageSet = consumer.fetch(request) - if (messageSet.hasError) { - warn("Got error code from broker for %s: %s" format (checkpointTopic, messageSet.errorCode(checkpointTopic, partitionId))) - val errorCode = messageSet.errorCode(checkpointTopic, partitionId) - if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) { - warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (checkpointTopic, partitionId)) - return null + while (offset < latestOffset) { + val request = new FetchRequestBuilder() + .addFetch(checkpointTopic, 0, offset, fetchSize) + .maxWait(500) + .minBytes(1) + .clientId(clientId) + .build + + val fetchResponse = consumer.fetch(request) + if (fetchResponse.hasError) { + warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0))) + val errorCode = fetchResponse.errorCode(checkpointTopic, 0) + if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) { + warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic)) + return + } + ErrorMapping.maybeThrowException(errorCode) } - ErrorMapping.maybeThrowException(errorCode) - } - val messages = messageSet.messageSet(checkpointTopic, partitionId).toList - if (messages.length != 1) { - throw new KafkaCheckpointException("Something really unexpected happened. Got %s " - + "messages back when fetching from checkpoint topic %s and partition %s. " - + "Expected one message. It would be unsafe to go on without the latest checkpoint, " - + "so failing." format (messages.length, checkpointTopic, partition)) - } + for (response <- fetchResponse.messageSet(checkpointTopic, 0)) { + offset = response.nextOffset + startingOffset = Some(offset) // For next time we call + + if (!response.message.hasKey) { + throw new KafkaCheckpointException("Encountered message without key.") + } - // Some back bending to go from message to checkpoint. - val checkpoint = serde.fromBytes(Utils.readBytes(messages(0).message.payload)) - loop.done - checkpoint + val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key)) + + if (!shouldHandleEntry(checkpointKey)) { + debug("Skipping " + entryType + " entry with key " + checkpointKey) + } else { + handleEntry(response.message.payload, checkpointKey) + } + } + } } finally { - consumer.close + consumer.close() } + + loop.done + Unit }, (exception, loop) => { exception match { - case e: InvalidMessageException => throw new KafkaCheckpointException ("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e) - case e: InvalidMessageSizeException => throw new KafkaCheckpointException ("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e) - case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException ("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e) + case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e) + case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e) + case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e) case e: KafkaCheckpointException => throw e case e: Exception => - warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (checkpointTopic, partition, e)) + warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e)) debug("Exception detail:", e) } } - ).getOrElse(throw new SamzaException("Failed to get checkpoint for partition %s" format partition.getPartitionId)) + ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic)) - info("Got checkpoint state for partition %s: %s" format (partition.getPartitionId, checkpoint)) - checkpoint } def start { - createTopic + create validateTopic } - def register(partition: Partition) { - partitions += partition + def register(taskName: TaskName) { + debug("Adding taskName " + taskName + " to " + this) + taskNames += taskName } def stop = { - if(producer != null) { + if (producer != null) { producer.close } } - private def createTopic { - info("Attempting to create checkpoint topic %s with %s partitions." format (checkpointTopic, totalPartitions)) + def create { + info("Attempting to create checkpoint topic %s." format checkpointTopic) retryBackoff.run( loop => { val zkClient = connectZk() @@ -223,7 +354,7 @@ class KafkaCheckpointManager( AdminUtils.createTopic( zkClient, checkpointTopic, - totalPartitions, + 1, replicationFactor) } finally { zkClient.close @@ -239,7 +370,7 @@ class KafkaCheckpointManager( info("Checkpoint topic %s already exists." format checkpointTopic) loop.done case e: Exception => - warn("Failed to create topic %s: %s. Retrying." format (checkpointTopic, e)) + warn("Failed to create topic %s: %s. Retrying." format(checkpointTopic, e)) debug("Exception detail:", e) } } @@ -255,8 +386,8 @@ class KafkaCheckpointManager( ErrorMapping.maybeThrowException(topicMetadata.errorCode) val partitionCount = topicMetadata.partitionsMetadata.length - if (partitionCount != totalPartitions) { - throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (checkpointTopic, topicMetadata.partitionsMetadata.length, totalPartitions)) + if (partitionCount != 1) { + throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length)) } info("Successfully validated checkpoint topic %s." format checkpointTopic) @@ -267,14 +398,19 @@ class KafkaCheckpointManager( exception match { case e: KafkaCheckpointException => throw e case e: Exception => - warn("While trying to validate topic %s: %s. Retrying." format (checkpointTopic, e)) + warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e)) debug("Exception detail:", e) } } ) } - override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format (systemName, checkpointTopic) + override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic) +} + +object KafkaCheckpointManager { + val CHECKPOINT_LOG4J_ENTRY = "checkpoint log" + val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping" } /** http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index cb6dbdf..087c6ad 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -19,26 +19,28 @@ package org.apache.samza.checkpoint.kafka -import org.apache.samza.config.{ KafkaConfig, Config } -import org.apache.samza.SamzaException -import java.util.Properties +import grizzled.slf4j.Logging import kafka.producer.Producer -import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.config.KafkaConfig.Config2Kafka +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient.ZkClient +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.CheckpointManager +import org.apache.samza.checkpoint.CheckpointManagerFactory +import org.apache.samza.config.Config import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.Partition -import grizzled.slf4j.Logging +import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore } -import org.apache.samza.util.Util -import org.I0Itec.zkclient.ZkClient -import kafka.utils.ZKStringSerializer -import org.apache.samza.checkpoint.CheckpointManagerFactory -import org.apache.samza.checkpoint.CheckpointManager +object KafkaCheckpointManagerFactory { + /** + * Version number to track the format of the checkpoint log + */ + val CHECKPOINT_LOG_VERSION_NUMBER = 1 +} class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging { + import KafkaCheckpointManagerFactory._ + def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = { val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config) val systemName = config @@ -60,7 +62,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size val connectProducer = () => { - new Producer[Partition, Array[Byte]](producerConfig) + new Producer[Array[Byte], Array[Byte]](producerConfig) } val zkConnect = Option(consumerConfig.zkConnect) .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) @@ -73,24 +75,24 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName)) val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId, socketTimeout) val checkpointTopic = getTopic(jobName, jobId) - - // This is a reasonably expensive operation and the TaskInstance already knows the answer. Should use that info. - val totalPartitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet.size + + // Find out the SSPGrouperFactory class so it can be included/verified in the key + val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory new KafkaCheckpointManager( clientId, checkpointTopic, systemName, - totalPartitions, replicationFactor, socketTimeout, bufferSize, fetchSize, metadataStore, connectProducer, - connectZk) + connectZk, + systemStreamPartitionGrouperFactoryString) } private def getTopic(jobName: String, jobId: String) = - "__samza_checkpoint_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-")) + "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-")) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala index 8a8834f..9553050 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala @@ -19,8 +19,6 @@ package org.apache.samza.system.kafka -import scala.annotation.implicitNotFound - import grizzled.slf4j.Logging import kafka.api.TopicMetadata import kafka.common.ErrorMapping http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala new file mode 100644 index 0000000..7a23041 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.checkpoint.kafka + +import org.apache.samza.container.TaskName +import org.junit.Assert._ +import org.junit.{Before, Test} +import org.apache.samza.SamzaException + +class TestKafkaCheckpointLogKey { + @Before + def setSSPGrouperFactoryString() { + KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("hello") + } + + @Test + def checkpointKeySerializationRoundTrip() { + val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN")) + val asBytes = checkpointKey.toBytes() + val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes) + + assertEquals(checkpointKey, backFromBytes) + assertNotSame(checkpointKey, backFromBytes) + } + + @Test + def changelogPartitionMappingKeySerializationRoundTrip() { + val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey() + val asBytes = key.toBytes() + val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes) + + assertEquals(key, backFromBytes) + assertNotSame(key, backFromBytes) + } + + @Test + def differingSSPGrouperFactoriesCauseException() { + + val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN")) + + val asBytes = checkpointKey.toBytes() + + KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("goodbye") + + var gotException = false + try { + KafkaCheckpointLogKey.fromBytes(asBytes) + } catch { + case se:SamzaException => assertEquals(new DifferingSystemStreamPartitionGrouperFactoryValues("hello", "goodbye").getMessage(), se.getCause.getMessage) + gotException = true + } + + assertTrue("Should have had an exception since ssp grouper factories didn't match", gotException) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 92ac61e..cddee13 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -19,11 +19,9 @@ package org.apache.samza.checkpoint.kafka -import org.I0Itec.zkclient.ZkClient -import org.junit.Assert._ -import org.junit.AfterClass -import org.junit.BeforeClass -import org.junit.Test +import kafka.common.InvalidMessageSizeException +import kafka.common.UnknownTopicOrPartitionException +import kafka.message.InvalidMessageException import kafka.producer.Producer import kafka.producer.ProducerConfig import kafka.server.KafkaConfig @@ -31,20 +29,20 @@ import kafka.server.KafkaServer import kafka.utils.TestUtils import kafka.utils.TestZKUtils import kafka.utils.Utils +import kafka.utils.ZKStringSerializer import kafka.zk.EmbeddedZookeeper -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.Partition -import scala.collection._ -import scala.collection.JavaConversions._ -import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore } -import org.apache.samza.config.MapConfig +import org.I0Itec.zkclient.ZkClient import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.container.TaskName import org.apache.samza.serializers.CheckpointSerde -import org.apache.samza.system.SystemStream -import kafka.utils.ZKStringSerializer -import kafka.message.InvalidMessageException -import kafka.common.InvalidMessageSizeException -import kafka.common.UnknownTopicOrPartitionException +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore } +import org.apache.samza.{SamzaException, Partition} +import org.junit.Assert._ +import org.junit.{AfterClass, BeforeClass, Test} +import scala.collection.JavaConversions._ +import scala.collection._ +import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory object TestKafkaCheckpointManager { val zkConnect: String = TestZKUtils.zookeeperConnect @@ -72,14 +70,16 @@ object TestKafkaCheckpointManager { config.put("request.required.acks", "-1") val producerConfig = new ProducerConfig(config) val partition = new Partition(0) - val cp1 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "123")) - val cp2 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "12345")) + val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123")) + val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345")) var zookeeper: EmbeddedZookeeper = null var server1: KafkaServer = null var server2: KafkaServer = null var server3: KafkaServer = null var metadataStore: TopicMetadataStore = null + val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName + @BeforeClass def beforeSetupServers { zookeeper = new EmbeddedZookeeper(zkConnect) @@ -108,42 +108,45 @@ class TestKafkaCheckpointManager { import TestKafkaCheckpointManager._ @Test - def testCheckpointShouldBeNullIfcheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite { + def testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite { val kcm = getKafkaCheckpointManager - kcm.register(partition) + val taskName = new TaskName(partition.toString) + kcm.register(taskName) kcm.start - var readCp = kcm.readLastCheckpoint(partition) + var readCp = kcm.readLastCheckpoint(taskName) // read before topic exists should result in a null checkpoint - assert(readCp == null) + assertNull(readCp) // create topic the first time around - kcm.writeCheckpoint(partition, cp1) - readCp = kcm.readLastCheckpoint(partition) - assert(cp1.equals(readCp)) + kcm.writeCheckpoint(taskName, cp1) + readCp = kcm.readLastCheckpoint(taskName) + assertEquals(cp1, readCp) // should get an exception if partition doesn't exist try { - readCp = kcm.readLastCheckpoint(new Partition(1)) + readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString)) fail("Expected a SamzaException, since only one partition (partition 0) should exist.") } catch { - case e: Exception => None // expected + case e: SamzaException => None // expected + case _: Exception => fail("Expected a SamzaException, since only one partition (partition 0) should exist.") } // writing a second message should work, too - kcm.writeCheckpoint(partition, cp2) - readCp = kcm.readLastCheckpoint(partition) - assert(cp2.equals(readCp)) + kcm.writeCheckpoint(taskName, cp2) + readCp = kcm.readLastCheckpoint(taskName) + assertEquals(cp2, readCp) kcm.stop } @Test - def testUnrecovableKafkaErrorShouldThrowKafkaCheckpointManagerException { + def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException { val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException") exceptions.foreach { exceptionName => val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName) - kcm.register(partition) + val taskName = new TaskName(partition.toString) + kcm.register(taskName) kcm.start - kcm.writeCheckpoint(partition, cp1) + kcm.writeCheckpoint(taskName, cp1) // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException try { - val readCpInvalide = kcm.readLastCheckpoint(partition) + kcm.readLastCheckpoint(taskName) fail("Expected a KafkaCheckpointException.") } catch { case e: KafkaCheckpointException => None @@ -156,28 +159,28 @@ class TestKafkaCheckpointManager { clientId = "some-client-id", checkpointTopic = "checkpoint-topic", systemName = "kafka", - totalPartitions = 1, replicationFactor = 3, socketTimeout = 30000, bufferSize = 64 * 1024, fetchSize = 300 * 1024, metadataStore = metadataStore, - connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig), - connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) + connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig), + connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), + systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString) // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager( clientId = "some-client-id-invalid-serde", checkpointTopic = "checkpoint-topic-invalid-serde", systemName = "kafka", - totalPartitions = 1, replicationFactor = 3, socketTimeout = 30000, bufferSize = 64 * 1024, fetchSize = 300 * 1024, metadataStore = metadataStore, - connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig), + connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig), connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), + systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, serde = new InvalideSerde(exception)) class InvalideSerde(exception: String) extends CheckpointSerde { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 6be9732..be1670c 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -22,10 +22,8 @@ package org.apache.samza.system.kafka import org.junit.Assert._ -import org.junit.Test +import org.junit.{Test, BeforeClass, AfterClass} import kafka.zk.EmbeddedZookeeper -import org.junit.BeforeClass -import org.junit.AfterClass import org.apache.samza.util.ClientUtilTopicMetadataStore import org.I0Itec.zkclient.ZkClient import kafka.admin.AdminUtils http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala index 751fe4c..6652f6b 100644 --- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala +++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala @@ -19,13 +19,9 @@ package org.apache.samza.storage.kv -import java.nio.ByteBuffer import org.iq80.leveldb._ -import org.fusesource.leveldbjni.internal.NativeComparator import org.fusesource.leveldbjni.JniDBFactory._ import java.io._ -import java.util.Iterator -import java.lang.Iterable import org.apache.samza.config.Config import org.apache.samza.container.SamzaContainerContext import grizzled.slf4j.{ Logger, Logging } @@ -39,8 +35,8 @@ object LevelDbKeyValueStore { val options = new Options // Cache size and write buffer size are specified on a per-container basis. - options.cacheSize(cacheSize / containerContext.partitions.size) - options.writeBufferSize((writeBufSize / containerContext.partitions.size).toInt) + options.cacheSize(cacheSize / containerContext.taskNames.size) + options.writeBufferSize((writeBufSize / containerContext.taskNames.size).toInt) options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096)) options.compressionType( storeConfig.get("leveldb.compression", "snappy") match { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java index 222c130..f20bb7f 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java @@ -20,6 +20,7 @@ package org.apache.samza.test.integration.join; import org.apache.samza.config.Config; +import org.apache.samza.container.TaskName; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -45,12 +46,12 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { private KeyValueStore<String, String> state; private int max; - private String partition; + private TaskName taskName; @Override public void init(Config config, TaskContext context) { this.state = (KeyValueStore<String, String>) context.getStore("emitter-state"); - this.partition = Integer.toString(context.getPartition().getPartitionId()); + this.taskName = context.getTaskName(); this.max = config.getInt("count"); } @@ -79,7 +80,7 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask { } int counter = getInt(COUNT); if(counter < max) { - OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + partition); + OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + taskName); collector.send(envelope); this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1)); } else { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index c0ac5dd..7d0b8db 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -22,7 +22,7 @@ package org.apache.samza.test.performance import grizzled.slf4j.Logging import org.apache.samza.config.Config import org.apache.samza.config.StorageConfig._ -import org.apache.samza.container.SamzaContainerContext +import org.apache.samza.container.{TaskName, SamzaContainerContext} import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.storage.kv.KeyValueStorageEngine @@ -34,7 +34,6 @@ import org.apache.samza.serializers.ByteSerde import org.apache.samza.Partition import org.apache.samza.SamzaException import java.io.File -import scala.collection.JavaConversions._ import java.util.UUID /** @@ -79,7 +78,9 @@ object TestKeyValuePerformance extends Logging { val numLoops = config.getInt("test.num.loops", 100) val messagesPerBatch = config.getInt("test.messages.per.batch", 10000) val messageSizeBytes = config.getInt("test.message.size.bytes", 200) - val partitions = (0 until partitionCount).map(new Partition(_)) + val taskNames = new java.util.ArrayList[TaskName]() + + (0 until partitionCount).map(p => taskNames.add(new TaskName(new Partition(p).toString))) info("Using partition count: %s" format partitionCount) info("Using num loops: %s" format numLoops) @@ -109,7 +110,7 @@ object TestKeyValuePerformance extends Logging { new ReadableCollector, new MetricsRegistryMap, null, - new SamzaContainerContext("test", config, partitions)) + new SamzaContainerContext("test", config, taskNames)) val db = if (!engine.isInstanceOf[KeyValueStorageEngine[_, _]]) { throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index dc44a99..3ed8b5c 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -19,64 +19,55 @@ package org.apache.samza.test.integration -import org.apache.samza.task.StreamTask -import org.apache.samza.task.TaskContext -import org.apache.samza.task.InitableTask -import org.apache.samza.config.Config -import scala.collection.JavaConversions._ -import org.apache.samza.task.TaskCoordinator -import org.apache.samza.task.MessageCollector -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.checkpoint.Checkpoint -import org.junit.BeforeClass -import org.junit.AfterClass -import kafka.zk.EmbeddedZookeeper -import kafka.utils.TestUtils -import org.apache.samza.system.SystemStream -import kafka.utils.TestZKUtils -import kafka.server.KafkaConfig -import org.I0Itec.zkclient.ZkClient +import java.util.Properties +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kafka.admin.AdminUtils +import kafka.common.ErrorMapping +import kafka.consumer.Consumer +import kafka.consumer.ConsumerConfig +import kafka.message.MessageAndMetadata +import kafka.producer.KeyedMessage +import kafka.producer.Producer import kafka.producer.ProducerConfig +import kafka.server.KafkaConfig import kafka.server.KafkaServer +import kafka.utils.TestUtils +import kafka.utils.TestZKUtils import kafka.utils.Utils -import org.apache.samza.storage.kv.KeyValueStore -import org.apache.samza.util._ -import org.junit.Test -import kafka.admin.AdminUtils -import kafka.common.ErrorMapping -import org.junit.Assert._ import kafka.utils.ZKStringSerializer -import scala.collection.mutable.ArrayBuffer -import org.apache.samza.job.local.LocalJobFactory -import org.apache.samza.job.ApplicationStatus -import java.util.concurrent.CountDownLatch -import org.apache.samza.job.local.ThreadJob -import org.apache.samza.util.TopicMetadataStore -import org.apache.samza.util.ClientUtilTopicMetadataStore +import kafka.zk.EmbeddedZookeeper +import org.I0Itec.zkclient.ZkClient +import org.apache.samza.Partition +import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.config.Config import org.apache.samza.config.MapConfig +import org.apache.samza.container.TaskName +import org.apache.samza.job.ApplicationStatus +import org.apache.samza.job.StreamJob +import org.apache.samza.job.local.LocalJobFactory +import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.system.kafka.TopicMetadataCache -import org.apache.samza.container.SamzaContainer +import org.apache.samza.system.{SystemStreamPartition, IncomingMessageEnvelope} +import org.apache.samza.task.InitableTask +import org.apache.samza.task.MessageCollector +import org.apache.samza.task.StreamTask +import org.apache.samza.task.TaskContext +import org.apache.samza.task.TaskCoordinator +import org.apache.samza.task.TaskCoordinator.RequestScope +import org.apache.samza.util.ClientUtilTopicMetadataStore +import org.apache.samza.util.TopicMetadataStore +import org.junit.Assert._ +import org.junit.{BeforeClass, AfterClass, Test} +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.SynchronizedMap -import org.apache.samza.Partition -import java.util.concurrent.TimeUnit -import kafka.producer.Producer -import kafka.producer.KeyedMessage -import java.util.concurrent.Semaphore -import java.util.concurrent.CyclicBarrier -import kafka.consumer.Consumer -import kafka.consumer.ConsumerConfig -import java.util.Properties -import java.util.concurrent.Executors -import kafka.message.MessageAndOffset -import kafka.message.MessageAndMetadata -import org.apache.samza.job.StreamJob -import org.apache.samza.task.TaskCoordinator.RequestScope object TestStatefulTask { val INPUT_TOPIC = "input" val STATE_TOPIC = "mystore" - val TOTAL_PARTITIONS = 1 + val TOTAL_TASK_NAMES = 1 val REPLICATION_FACTOR = 3 val zkConnect: String = TestZKUtils.zookeeperConnect @@ -102,8 +93,8 @@ object TestStatefulTask { config.put("serializer.class", "kafka.serializer.StringEncoder"); val producerConfig = new ProducerConfig(config) var producer: Producer[String, String] = null - val cp1 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "123")) - val cp2 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "12345")) + val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123")) + val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345")) var zookeeper: EmbeddedZookeeper = null var server1: KafkaServer = null var server2: KafkaServer = null @@ -128,13 +119,13 @@ object TestStatefulTask { AdminUtils.createTopic( zkClient, INPUT_TOPIC, - TOTAL_PARTITIONS, + TOTAL_TASK_NAMES, REPLICATION_FACTOR) AdminUtils.createTopic( zkClient, STATE_TOPIC, - TOTAL_PARTITIONS, + TOTAL_TASK_NAMES, REPLICATION_FACTOR) } @@ -221,7 +212,14 @@ class TestStatefulTask { "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic "systems.kafka.samza.msg.serde" -> "string", "systems.kafka.consumer.zookeeper.connect" -> zkConnect, - "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1)) + "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1), + // Since using state, need a checkpoint manager + "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory", + "task.checkpoint.system" -> "kafka", + "task.checkpoint.replication.factor" -> "1", + // However, don't have the inputs use the checkpoint manager + // since the second part of the test expects to replay the input streams. + "systems.kafka.streams.input.samza.reset.offset" -> "true") @Test def testShouldStartAndRestore { @@ -278,7 +276,7 @@ class TestStatefulTask { count += 1 } - assertTrue(count < 100) + assertTrue("Timed out waiting to received messages. Received thus far: " + task.received.size, count < 100) // Reset the count down latch after the 4 messages come in. task.awaitMessage @@ -328,8 +326,7 @@ class TestStatefulTask { TestTask.awaitTaskRegistered val tasks = TestTask.tasks - // Should only have one partition. - assertEquals(1, tasks.size) + assertEquals("Should only have a single partition in this task", 1, tasks.size) val task = tasks.values.toList.head @@ -392,25 +389,25 @@ class TestStatefulTask { } object TestTask { - val tasks = new HashMap[Partition, TestTask] with SynchronizedMap[Partition, TestTask] - @volatile var allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_PARTITIONS) + val tasks = new HashMap[TaskName, TestTask] with SynchronizedMap[TaskName, TestTask] + @volatile var allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES) /** * Static method that tasks can use to register themselves with. Useful so * we don't have to sneak into the ThreadJob/SamzaContainer to get our test * tasks. */ - def register(partition: Partition, task: TestTask) { - tasks += partition -> task + def register(taskName: TaskName, task: TestTask) { + tasks += taskName -> task allTasksRegistered.countDown } def awaitTaskRegistered { allTasksRegistered.await(60, TimeUnit.SECONDS) assertEquals(0, allTasksRegistered.getCount) - assertEquals(TestStatefulTask.TOTAL_PARTITIONS, tasks.size) + assertEquals(TestStatefulTask.TOTAL_TASK_NAMES, tasks.size) // Reset the registered latch, so we can use it again every time we start a new job. - TestTask.allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_PARTITIONS) + TestTask.allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES) } } @@ -422,7 +419,7 @@ class TestTask extends StreamTask with InitableTask { var gotMessage = new CountDownLatch(1) def init(config: Config, context: TaskContext) { - TestTask.register(context.getPartition, this) + TestTask.register(context.getTaskName, this) store = context .getStore(TestStatefulTask.STATE_TOPIC) .asInstanceOf[KeyValueStore[String, String]] http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml index ea6f03b..86fc0fd 100644 --- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml +++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml @@ -119,21 +119,24 @@ %td.key Finished %td= state.finishedTasks.size.toString - %h3 Partition Assignment - %table.table.table-striped.table-bordered.tablesorter#partitions-table + %h3 TaskName Assignment + %table.table.table-striped.table-bordered.tablesorter#taskids-table %thead %tr %th Task ID - %th Partitions + %th TaskName + %th SystemStreamPartitions %th Container %tbody - - for((taskId, partitions) <- state.taskPartitions) - %tr - %td= taskId - %td= partitions.map(_.getPartitionId).toList.sorted.mkString(", ") - %td - - val container = state.runningTasks(taskId) - %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString + - for((taskId, taskNames) <- state.taskToTaskNames) + - for((taskName, ssps) <- taskNames) + %tr + %td= taskId + %td= taskName + %td= ssps.map(_.toString).toList.sorted.mkString(", ") + %td + - val container = state.runningTasks(taskId) + %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString %div.tab-pane#config %h2 Config @@ -154,7 +157,7 @@ :javascript $(document).ready(function() { $("#containers-table").tablesorter(); - $("#partitions-table").tablesorter(); + $("#taskids-table").tablesorter(); $("#config-table").tablesorter(); $("#config-table-filter").keyup(function() { var regex = new RegExp($(this).val(), 'i'); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala index 01a2683..d9dfbc6 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala @@ -18,12 +18,12 @@ */ package org.apache.samza.job.yarn -import org.apache.samza.config.Config import grizzled.slf4j.Logging import org.apache.hadoop.yarn.api.records.FinalApplicationStatus -import org.apache.samza.Partition -import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.api.records.ContainerId +import java.util +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.container.TaskName /** * Samza's application master has state that is usually manipulated based on @@ -40,7 +40,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod var unclaimedTasks = Set[Int]() var finishedTasks = Set[Int]() var runningTasks = Map[Int, YarnContainer]() - var taskPartitions = Map[Int, Set[Partition]]() + var taskToTaskNames = Map[Int, util.Map[TaskName, util.Set[SystemStreamPartition]]]() var status = FinalApplicationStatus.UNDEFINED // controlled by the service http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala index eb1ff54..0dd244d 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import java.util.Collections import scala.collection.JavaConversions._ +import scala.collection.JavaConverters.mapAsJavaMapConverter import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer @@ -46,6 +47,7 @@ import org.apache.samza.job.ShellCommandBuilder import org.apache.samza.util.Util import grizzled.slf4j.Logging +import org.apache.samza.container.TaskNamesToSystemStreamPartitions object SamzaAppMasterTaskManager { val DEFAULT_CONTAINER_MEM = 1024 @@ -72,7 +74,10 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA 1 } - val allSystemStreamPartitions = Util.getInputStreamPartitions(config) + val tasksToSSPTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, state.taskCount) + + val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, tasksToSSPTaskNames) + var taskFailures = Map[Int, TaskFailure]() var tooManyFailedContainers = false // TODO we might want to use NMClientAsync as well @@ -106,13 +111,14 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA state.unclaimedTasks.headOption match { case Some(taskId) => { info("Got available task id (%d) for container: %s" format (taskId, container)) - val streamsAndPartitionsForTask = Util.getStreamsAndPartitionsForContainer(taskId, state.taskCount, allSystemStreamPartitions) - info("Claimed partitions %s for container ID %s" format (streamsAndPartitionsForTask, taskId)) + val sspTaskNames: TaskNamesToSystemStreamPartitions = tasksToSSPTaskNames.getOrElse(taskId, TaskNamesToSystemStreamPartitions()) + info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, taskId)) val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName) val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder] .setConfig(config) .setName("samza-container-%s" format taskId) - .setStreamPartitions(streamsAndPartitionsForTask) + .setTaskNameToSystemStreamPartitionsMapping(sspTaskNames.getJavaFriendlyType) + .setTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMapping.map(kv => kv._1 -> Integer.valueOf(kv._2)).asJava) val command = cmdBuilder.buildCommand info("Task ID %s using command %s" format (taskId, command)) val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) } @@ -129,7 +135,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA state.neededContainers -= 1 state.runningTasks += taskId -> new YarnContainer(container) state.unclaimedTasks -= taskId - state.taskPartitions += taskId -> streamsAndPartitionsForTask.map(_.getPartition).toSet + state.taskToTaskNames += taskId -> sspTaskNames.getJavaFriendlyType info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr)) @@ -151,7 +157,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA taskId match { case Some(taskId) => { state.runningTasks -= taskId - state.taskPartitions -= taskId + state.taskToTaskNames -= taskId } case _ => None } @@ -315,4 +321,5 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA capability.setVirtualCores(cpuCores) (0 until containers).foreach(idx => amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority))) } + }
