Repository: incubator-samza Updated Branches: refs/heads/master f6d341508 -> 6f595beda
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index e06ca90..1a67586 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -19,25 +19,14 @@ package org.apache.samza.util -import java.io._ +import java.net.URL +import java.io.BufferedReader import java.lang.management.ManagementFactory -import java.util +import java.io.File +import org.apache.samza.system.SystemStream import java.util.Random -import java.net.URL -import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.CheckpointManagerFactory -import org.apache.samza.config.Config -import org.apache.samza.config.StorageConfig.Config2Storage -import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.system.{SystemStreamPartition, SystemFactory, StreamMetadataCache, SystemStream} -import scala.collection.JavaConversions._ -import scala.collection -import org.apache.samza.container.TaskName -import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory -import org.apache.samza.container.TaskNamesToSystemStreamPartitions -import org.apache.samza.container.grouper.task.GroupByContainerCount +import org.apache.samza.job.model.JobModel +import java.io.InputStreamReader object Util extends Logging { val random = new Random @@ -83,88 +72,14 @@ object Util extends Logging { } /** - * For each input stream specified in config, exactly determine its partitions, returning a set of SystemStreamPartitions - * containing them all - * - * @param config Source of truth for systems and inputStreams - * @return Set of SystemStreamPartitions, one for each unique system, stream and partition - */ - def getInputStreamPartitions(config: Config): Set[SystemStreamPartition] = { - val inputSystemStreams = config.getInputStreams - val systemNames = config.getSystemNames.toSet - - // Map the name of each system to the corresponding SystemAdmin - val systemAdmins = systemNames.map(systemName => { - val systemFactoryClassName = config - .getSystemFactory(systemName) - .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) - val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName) - systemName -> systemFactory.getAdmin(systemName, config) - }).toMap - - // Get the set of partitions for each SystemStream from the stream metadata - new StreamMetadataCache(systemAdmins) - .getStreamMetadata(inputSystemStreams) - .flatMap { case (systemStream, metadata) => - metadata - .getSystemStreamPartitionMetadata - .keys - .map(new SystemStreamPartition(systemStream, _)) - }.toSet - } - - /** - * Assign mapping of which TaskNames go to which container - * - * @param config For factories for Grouper and TaskNameGrouper - * @param containerCount How many tasks are we we working with - * @return Map of int (taskId) to SSPTaskNameMap that taskID is responsible for - */ - def assignContainerToSSPTaskNames(config:Config, containerCount:Int): Map[Int, TaskNamesToSystemStreamPartitions] = { - import org.apache.samza.config.JobConfig.Config2Job - - val allSystemStreamPartitions: Set[SystemStreamPartition] = Util.getInputStreamPartitions(config) - - val sspTaskNamesAsJava: util.Map[TaskName, util.Set[SystemStreamPartition]] = { - val factoryString = config.getSystemStreamPartitionGrouperFactory - - info("Instantiating type " + factoryString + " to build SystemStreamPartition groupings") - - val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString) - - val grouper = factory.getSystemStreamPartitionGrouper(config) - - val groups = grouper.group(allSystemStreamPartitions) - - info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:") - groups.foreach(g => info("TaskName: " + g._1 + " => " + g._2)) - - groups - } - - val sspTaskNames = TaskNamesToSystemStreamPartitions(sspTaskNamesAsJava) - - info("Assigning " + sspTaskNames.keySet.size + " SystemStreamPartitions taskNames to " + containerCount + " containers.") - - // Here is where we should put in a pluggable option for the SSPTaskNameGrouper for locality, load-balancing, etc. - val sspTaskNameGrouper = new GroupByContainerCount(containerCount) - - val containersToTaskNames = sspTaskNameGrouper.groupTaskNames(sspTaskNames).toMap - - info("Grouped SystemStreamPartition TaskNames (size = " + containersToTaskNames.size + "): ") - containersToTaskNames.foreach(t => info("Container number: " + t._1 + " => " + t._2)) - - containersToTaskNames - } - - /** * Returns a SystemStream object based on the system stream name given. For * example, kafka.topic would return new SystemStream("kafka", "topic"). */ def getSystemStreamFromNames(systemStreamNames: String): SystemStream = { val idx = systemStreamNames.indexOf('.') - if (idx < 0) + if (idx < 0) { throw new IllegalArgumentException("No '.' in stream name '" + systemStreamNames + "'. Stream names should be in the form 'system.stream'") + } new SystemStream(systemStreamNames.substring(0, idx), systemStreamNames.substring(idx + 1, systemStreamNames.length)) } @@ -177,130 +92,13 @@ object Util extends Logging { } /** - * Using previous taskName to partition mapping and current taskNames for this job run, create a new mapping that preserves - * the previous order and deterministically assigns any new taskNames to changelog partitions. Be chatty about new or - * missing taskNames. - * - * @param currentTaskNames All the taskNames the current job is processing - * @param previousTaskNameMapping Previous mapping of taskNames to partition - * @return New mapping of taskNames to partitions for the changelog - */ - def resolveTaskNameToChangelogPartitionMapping(currentTaskNames:Set[TaskName], - previousTaskNameMapping:Map[TaskName, Int]): Map[TaskName, Int] = { - info("Previous mapping of taskNames to partition: " + previousTaskNameMapping.toList.sorted) - info("Current set of taskNames: " + currentTaskNames.toList.sorted) - - val previousTaskNames: Set[TaskName] = previousTaskNameMapping.keySet - - if(previousTaskNames.equals(currentTaskNames)) { - info("No change in TaskName sets from previous run. Returning previous mapping.") - return previousTaskNameMapping - } - - if(previousTaskNames.isEmpty) { - warn("No previous taskName mapping defined. This is OK if it's the first time the job is being run, otherwise data may have been lost.") - } - - val missingTaskNames = previousTaskNames -- currentTaskNames - - if(missingTaskNames.isEmpty) { - info("No taskNames are missing between this run and previous") - } else { - warn("The following taskNames were previously defined and are no longer present: " + missingTaskNames) - } - - val newTaskNames = currentTaskNames -- previousTaskNames - - if(newTaskNames.isEmpty) { - info("No new taskNames have been added between this run and the previous") - previousTaskNameMapping // Return the old mapping since there are no new taskNames for which to account - - } else { - warn("The following new taskNames have been added in this job run: " + newTaskNames) - - // Sort the new taskNames and assign them partitions (starting at 0 for now) - val sortedNewTaskNames = newTaskNames.toList.sortWith { (a,b) => a.getTaskName < b.getTaskName }.zipWithIndex.toMap - - // Find next largest partition to use based on previous mapping - val nextPartitionToUse = if(previousTaskNameMapping.size == 0) 0 - else previousTaskNameMapping.foldLeft(0)((a,b) => math.max(a, b._2)) + 1 - - // Bump up the partition values - val newTaskNamesWithTheirPartitions = sortedNewTaskNames.map(c => c._1 -> (c._2 + nextPartitionToUse)) - - // Merge old and new - val newMapping = previousTaskNameMapping ++ newTaskNamesWithTheirPartitions - - info("New taskName to partition mapping: " + newMapping.toList.sortWith{ (a,b) => a._2 < b._2}) - - newMapping - } - } - - /** - * Read the TaskName to changelog partition mapping from the checkpoint manager, if one exists. - * - * @param config To pull out values for instantiating checkpoint manager - * @param tasksToSSPTaskNames Current TaskNames for the current job run - * @return Current mapping of TaskNames to changelog partitions - */ - def getTaskNameToChangeLogPartitionMapping(config: Config, tasksToSSPTaskNames: Map[Int, TaskNamesToSystemStreamPartitions]) = { - val taskNameMaps: Set[TaskNamesToSystemStreamPartitions] = tasksToSSPTaskNames.map(_._2).toSet - val currentTaskNames: Set[TaskName] = taskNameMaps.map(_.keys).toSet.flatten - - // We need to oh so quickly instantiate a checkpoint manager and grab the partition mapping from the log, then toss the manager aside - val checkpointManager = config.getCheckpointManagerFactory match { - case Some(checkpointFactoryClassName) => - Util - .getObj[CheckpointManagerFactory](checkpointFactoryClassName) - .getCheckpointManager(config, new MetricsRegistryMap) - case _ => null - } - - if(checkpointManager == null) { - // Check if we have a changelog configured, which requires a checkpoint manager - - if(!config.getStoreNames.isEmpty) { - throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified. " + - "Unable to start job as there would be no place to store changelog partition mapping.") - } - // No need to do any mapping, just use what has been provided - Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, Map[TaskName, Int]()) - } else { - - info("Got checkpoint manager: %s" format checkpointManager) - - // Always put in a call to create so the log is available for the tasks on startup. - // Reasonably lame to hide it in here. TODO: Pull out to more visible location. - checkpointManager.start - - val previousMapping: Map[TaskName, Int] = { - val fromCM = checkpointManager.readChangeLogPartitionMapping() - - fromCM.map(kv => kv._1 -> kv._2.intValue()).toMap // Java to Scala interop!!! - } - - checkpointManager.stop - - val newMapping = Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, previousMapping) - - if (newMapping != null) { - info("Writing new changelog partition mapping to checkpoint manager.") - checkpointManager.writeChangeLogPartitionMapping(newMapping.map(kv => kv._1 -> new java.lang.Integer(kv._2))) //Java to Scala interop!!! - } - - newMapping - } - } - - /** * Makes sure that an object is not null, and throws a NullPointerException * if it is. */ def notNull[T](obj: T, msg: String) = if (obj == null) { throw new NullPointerException(msg) } - + /** * Returns the name representing the JVM. It usually contains the PID of the process plus some additional information * @return String that contains the name representing this JVM @@ -313,10 +111,13 @@ object Util extends Logging { * Reads a URL and returns its body as a string. Does no error handling. * * @param url HTTP URL to read from. + * @param timeout How long to wait before timing out when connecting to or reading from the HTTP server. * @return String payload of the body of the HTTP response. */ - def read(url: URL): String = { + def read(url: URL, timeout: Int = 30000): String = { val conn = url.openConnection(); + conn.setConnectTimeout(timeout) + conn.setReadTimeout(timeout) val br = new BufferedReader(new InputStreamReader(conn.getInputStream)); var line: String = null; val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java new file mode 100644 index 0000000..76bc681 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers.model; + +import static org.junit.Assert.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.system.SystemStreamPartition; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Test; + +public class TestSamzaObjectMapper { + @Test + public void testJsonTaskModel() throws Exception { + ObjectMapper mapper = SamzaObjectMapper.getObjectMapper(); + Map<String, String> configMap = new HashMap<String, String>(); + configMap.put("a", "b"); + Config config = new MapConfig(configMap); + Set<SystemStreamPartition> inputSystemStreamPartitions = new HashSet<SystemStreamPartition>(); + inputSystemStreamPartitions.add(new SystemStreamPartition("foo", "bar", new Partition(1))); + TaskName taskName = new TaskName("test"); + TaskModel taskModel = new TaskModel(taskName, inputSystemStreamPartitions, new Partition(2)); + Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>(); + tasks.put(taskName, taskModel); + ContainerModel containerModel = new ContainerModel(1, tasks); + Map<Integer, ContainerModel> containerMap = new HashMap<Integer, ContainerModel>(); + containerMap.put(Integer.valueOf(1), containerModel); + JobModel jobModel = new JobModel(config, containerMap); + String str = mapper.writeValueAsString(jobModel); + JobModel obj = mapper.readValue(str, JobModel.class); + assertEquals(jobModel, obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 393b09a..a0ea8b6 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -19,53 +19,59 @@ package org.apache.samza.container -import org.apache.samza.config.Config -import org.junit.Assert._ -import org.junit.Test +import scala.collection.JavaConversions._ + import org.apache.samza.Partition +import org.apache.samza.config.Config import org.apache.samza.config.MapConfig +import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.server.HttpServer +import org.apache.samza.coordinator.server.JobServlet +import org.apache.samza.job.model.ContainerModel +import org.apache.samza.job.model.JobModel +import org.apache.samza.job.model.TaskModel import org.apache.samza.metrics.JmxServer +import org.apache.samza.serializers.SerdeManager import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.SystemConsumers +import org.apache.samza.system.StreamMetadataCache import org.apache.samza.system.SystemConsumer -import org.apache.samza.system.SystemProducers +import org.apache.samza.system.SystemConsumers import org.apache.samza.system.SystemProducer -import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.SystemProducers import org.apache.samza.system.SystemStream -import org.apache.samza.system.StreamMetadataCache +import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.chooser.RoundRobinChooser -import org.apache.samza.serializers.SerdeManager -import org.apache.samza.task.StreamTask -import org.apache.samza.task.MessageCollector -import org.apache.samza.task.TaskCoordinator +import org.apache.samza.task.ClosableTask import org.apache.samza.task.InitableTask +import org.apache.samza.task.MessageCollector +import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskContext -import org.apache.samza.task.ClosableTask +import org.apache.samza.task.TaskCoordinator import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin +import org.junit.Assert._ +import org.junit.Test import org.scalatest.junit.AssertionsForJUnit -import org.apache.samza.coordinator.server.HttpServer -import org.apache.samza.coordinator.server.JobServlet -import scala.collection.JavaConversions._ class TestSamzaContainer extends AssertionsForJUnit { @Test - def testCoordinatorObjects { - val server = new HttpServer("/test") + def testReadJobModel { + val config = new MapConfig(Map("a" -> "b")) + val tasks = Map( + new TaskName("t1") -> new TaskModel(new TaskName("t1"), Set[SystemStreamPartition](), new Partition(0)), + new TaskName("t2") -> new TaskModel(new TaskName("t2"), Set[SystemStreamPartition](), new Partition(0))) + val containers = Map( + Integer.valueOf(0) -> new ContainerModel(0, tasks), + Integer.valueOf(1) -> new ContainerModel(1, tasks)) + val jobModel = new JobModel(config, containers) + val server = new HttpServer + val coordinator = new JobCoordinator(jobModel, server) + coordinator.server.addServlet("/*", new JobServlet(jobModel)) try { - val taskName = new TaskName("a") - val set = Set(new SystemStreamPartition("a", "b", new Partition(0))) - val config = new MapConfig(Map("a" -> "b", "c" -> "d")) - val containerToTaskMapping = Map(0 -> new TaskNamesToSystemStreamPartitions(Map(taskName -> set))) - val taskToChangelogMapping = Map[TaskName, Int](taskName -> 0) - server.addServlet("/job", new JobServlet(config, containerToTaskMapping, taskToChangelogMapping)) - server.start - val (returnedConfig, returnedSspTaskNames, returnedTaskNameToChangeLogPartitionMapping) = SamzaContainer.getCoordinatorObjects(server.getUrl.toString + "/job") - assertEquals(config, returnedConfig) - assertEquals(containerToTaskMapping, returnedSspTaskNames) - assertEquals(taskToChangelogMapping, returnedTaskNameToChangeLogPartitionMapping) + coordinator.start + assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString)) } finally { - server.stop + coordinator.stop } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala deleted file mode 100644 index 9a3406e..0000000 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.container - -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.{SamzaException, Partition} -import org.junit.Test -import org.junit.Assert._ - -class TestTaskNamesToSystemStreamPartitions { - var sspCounter = 0 - def makeSSP(stream:String) = new SystemStreamPartition("system", stream, new Partition(42)) - - @Test - def toSetWorksCorrectly() { - val map = Map(new TaskName("tn1") -> Set(makeSSP("tn1-1"), makeSSP("tn1-2")), - new TaskName("tn2") -> Set(makeSSP("tn2-1"), makeSSP("tn2-2"))) - val tntssp = TaskNamesToSystemStreamPartitions(map) - - val asSet = tntssp.toSet - val expected = Set(new TaskName("tn1") -> Set(makeSSP("tn1-1"), makeSSP("tn1-2")), - (new TaskName("tn2") -> Set(makeSSP("tn2-1"), makeSSP("tn2-2")))) - assertEquals(expected, asSet) - } - - @Test - def validateMethodCatchesDuplicatedSSPs() { - val duplicatedSSP1 = new SystemStreamPartition("sys", "str", new Partition(42)) - val duplicatedSSP2 = new SystemStreamPartition("sys", "str", new Partition(42)) - val notDuplicatedSSP1 = new SystemStreamPartition("sys", "str2", new Partition(42)) - val notDuplicatedSSP2 = new SystemStreamPartition("sys", "str3", new Partition(42)) - - val badMapping = Map(new TaskName("a") -> Set(notDuplicatedSSP1, duplicatedSSP1), new TaskName("b") -> Set(notDuplicatedSSP2, duplicatedSSP2)) - - var caughtException = false - try { - TaskNamesToSystemStreamPartitions(badMapping) - } catch { - case se: SamzaException => assertEquals("Assigning the same SystemStreamPartition to multiple " + - "TaskNames is not currently supported. Out of compliance SystemStreamPartitions and counts: " + - "Map(SystemStreamPartition [sys, str, 42] -> 2)", se.getMessage) - caughtException = true - case _: Throwable => - } - assertTrue("TaskNamesToSystemStreamPartitions should have rejected this mapping but didn't", caughtException) - } - - @Test - def validateMethodAllowsUniqueSSPs() { - val sspSet1 = (0 to 10).map(p => new SystemStreamPartition("sys", "str", new Partition(p))).toSet - val sspSet2 = (0 to 10).map(p => new SystemStreamPartition("sys", "str2", new Partition(p))).toSet - - TaskNamesToSystemStreamPartitions(Map(new TaskName("set1") -> sspSet1, new TaskName("set2") -> sspSet2)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala index 20f41a8..1ee5c06 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala @@ -18,37 +18,56 @@ */ package org.apache.samza.container.grouper.task -import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} +import org.apache.samza.container.TaskName import org.apache.samza.system.SystemStreamPartition import org.junit.Assert._ import org.junit.Test +import org.apache.samza.job.model.TaskModel +import org.apache.samza.Partition +import scala.collection.JavaConversions +import org.scalatest.Assertions.intercept +import scala.collection.JavaConversions._ class TestGroupByContainerCount { - val emptySSPSet = Set[SystemStreamPartition]() - @Test - def weGetAsExactlyManyGroupsAsWeAskFor() { - // memoize the maps used in the test to avoid an O(n^3) loop - val tntsspCache = scala.collection.mutable.Map[Int, TaskNamesToSystemStreamPartitions]() - - def tntsspOfSize(size:Int) = { - def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap) - - tntsspCache.getOrElseUpdate(size, getMap(size)) - } + def testEmptyTasks { + intercept[IllegalArgumentException] { new GroupByContainerCount(1).group(Set()) } + } - val maxTNTSSPSize = 1000 - val maxNumGroups = 140 - for(numGroups <- 1 to maxNumGroups) { - val grouper = new GroupByContainerCount(numGroups) + @Test + def testFewerTasksThanContainers { + intercept[IllegalArgumentException] { new GroupByContainerCount(2).group(Set(null)) } + } - for (tntsspSize <- numGroups to maxTNTSSPSize) { - val map = tntsspOfSize(tntsspSize) - assertEquals(tntsspSize, map.size) + @Test + def testHappyPath { + val taskModels = Set( + getTaskModel("1", 1), + getTaskModel("2", 2), + getTaskModel("3", 3), + getTaskModel("4", 4), + getTaskModel("5", 5)) + val containers = new GroupByContainerCount(2) + .group(taskModels) + .map(containerModel => containerModel.getContainerId -> containerModel) + .toMap + assertEquals(2, containers.size) + val container0 = containers(0) + val container1 = containers(1) + assertNotNull(container0) + assertNotNull(container1) + assertEquals(0, container0.getContainerId) + assertEquals(1, container1.getContainerId) + assertEquals(3, container0.getTasks.size) + assertEquals(2, container1.getTasks.size) + assertTrue(container0.getTasks.containsKey(new TaskName("1"))) + assertTrue(container0.getTasks.containsKey(new TaskName("3"))) + assertTrue(container0.getTasks.containsKey(new TaskName("5"))) + assertTrue(container1.getTasks.containsKey(new TaskName("2"))) + assertTrue(container1.getTasks.containsKey(new TaskName("4"))) + } - val grouped = grouper.groupTaskNames(map) - assertEquals("Asked for " + numGroups + " but got " + grouped.size, numGroups, grouped.size) - } - } + private def getTaskModel(name: String, partitionId: Int) = { + new TaskModel(new TaskName(name), Set[SystemStreamPartition](), new Partition(partitionId)) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala new file mode 100644 index 0000000..1eb0eda --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator + +import org.junit.Test +import org.junit.Assert._ +import scala.collection.JavaConversions._ +import org.apache.samza.config.MapConfig +import org.apache.samza.config.TaskConfig +import org.apache.samza.config.SystemConfig +import org.apache.samza.container.TaskName +import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.checkpoint.CheckpointManagerFactory +import org.apache.samza.checkpoint.CheckpointManager +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.config.Config +import org.apache.samza.system.SystemFactory +import org.apache.samza.system.SystemAdmin +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.SystemStreamMetadata +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.Partition +import org.apache.samza.job.model.JobModel +import org.apache.samza.job.model.ContainerModel +import org.apache.samza.job.model.TaskModel + +class TestJobCoordinator { + /** + * Builds a coordinator from config, and then compares it with what was + * expected. We simulate having a checkpoint manager that has 2 task + * changelog entries, and our model adds a third task. Expectation is that + * the JobCoordinator will assign the new task with a new changelog + * partition. + */ + @Test + def testJobCoordinator { + val containerCount = 2 + val config = new MapConfig(Map( + TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getCanonicalName, + TaskConfig.INPUT_STREAMS -> "test.stream1", + (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName)) + val coordinator = JobCoordinator(config, containerCount) + + // Construct the expected JobModel, so we can compare it to + // JobCoordinator's JobModel. + val task0Name = new TaskName("Partition 0") + val task1Name = new TaskName("Partition 1") + val task2Name = new TaskName("Partition 2") + val container0Tasks = Map( + task0Name -> new TaskModel(task0Name, Set(new SystemStreamPartition("test", "stream1", new Partition(0))), new Partition(4)), + task2Name -> new TaskModel(task2Name, Set(new SystemStreamPartition("test", "stream1", new Partition(2))), new Partition(5))) + val container1Tasks = Map( + task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(3))) + val containers = Map( + Integer.valueOf(0) -> new ContainerModel(0, container0Tasks), + Integer.valueOf(1) -> new ContainerModel(1, container1Tasks)) + val jobModel = new JobModel(config, containers) + assertEquals(config, coordinator.jobModel.getConfig) + assertEquals(jobModel, coordinator.jobModel) + } +} + +object MockCheckpointManager { + var mapping: java.util.Map[TaskName, java.lang.Integer] = Map[TaskName, java.lang.Integer]( + new TaskName("Partition 0") -> 4, + new TaskName("Partition 1") -> 3) +} + +class MockCheckpointManagerFactory extends CheckpointManagerFactory { + def getCheckpointManager(config: Config, registry: MetricsRegistry) = new MockCheckpointManager +} + +class MockCheckpointManager extends CheckpointManager { + def start() {} + def register(taskName: TaskName) {} + def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {} + def readLastCheckpoint(taskName: TaskName) = null + def readChangeLogPartitionMapping = MockCheckpointManager.mapping + def writeChangeLogPartitionMapping(mapping: java.util.Map[TaskName, java.lang.Integer]) { + MockCheckpointManager.mapping = mapping + } + def stop() {} +} + +class MockSystemFactory extends SystemFactory { + def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = null + def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = null + def getAdmin(systemName: String, config: Config) = new MockSystemAdmin +} + +class MockSystemAdmin extends SystemAdmin { + def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null + def getSystemStreamMetadata(streamNames: java.util.Set[String]): java.util.Map[String, SystemStreamMetadata] = { + assertEquals(1, streamNames.size) + val partitionMetadata = Map( + new Partition(0) -> new SystemStreamPartitionMetadata(null, null, null), + new Partition(1) -> new SystemStreamPartitionMetadata(null, null, null), + // Create a new Partition(2), which wasn't in the prior changelog mapping. + new Partition(2) -> new SystemStreamPartitionMetadata(null, null, null)) + Map(streamNames.toList.head -> new SystemStreamMetadata("foo", partitionMetadata)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index 93c71b4..b75f440 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -23,7 +23,7 @@ import org.apache.samza.Partition import org.apache.samza.config.Config import org.apache.samza.config.Config import org.apache.samza.config.MapConfig -import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} +import org.apache.samza.container.TaskName import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.SystemFactory @@ -37,72 +37,4 @@ import scala.collection.JavaConversions._ import scala.util.Random class TestUtil { - val random = new Random(System.currentTimeMillis()) - - @Test - def testGetInputStreamPartitions { - val expectedPartitionsPerStream = 1 - val inputSystemStreamNames = List("test.foo", "test.bar") - val config = new MapConfig(Map( - "task.inputs" -> inputSystemStreamNames.mkString(","), - "systems.test.samza.factory" -> classOf[MockSystemFactory].getName, - "systems.test.partitions.per.stream" -> expectedPartitionsPerStream.toString)) - val systemStreamPartitions = Util.getInputStreamPartitions(config) - assertNotNull(systemStreamPartitions) - assertEquals(expectedPartitionsPerStream * inputSystemStreamNames.size, systemStreamPartitions.size) - inputSystemStreamNames.foreach(systemStreamName => { - (0 until expectedPartitionsPerStream).foreach(partitionNumber => { - val partition = new Partition(partitionNumber) - val systemStreamNameSplit = systemStreamName.split("\\.") - systemStreamPartitions.contains(new SystemStreamPartition(systemStreamNameSplit(0), systemStreamNameSplit(1), partition)) - }) - }) - } - - @Test - def testResolveTaskNameToChangelogPartitionMapping { - def testRunner(description:String, currentTaskNames:Set[TaskName], previousTaskNameMapping:Map[TaskName, Int], - result:Map[TaskName, Int]) { - assertEquals("Failed: " + description, result, - Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, previousTaskNameMapping)) - } - - testRunner("No change between runs", - Set(new TaskName("Partition 0")), - Map(new TaskName("Partition 0") -> 0), - Map(new TaskName("Partition 0") -> 0)) - - testRunner("New TaskName added, none missing this run", - Set(new TaskName("Partition 0"), new TaskName("Partition 1")), - Map(new TaskName("Partition 0") -> 0), - Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1)) - - testRunner("New TaskName added, one missing this run", - Set(new TaskName("Partition 0"), new TaskName("Partition 2")), - Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1), - Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2)) - - testRunner("New TaskName added, all previous missing this run", - Set(new TaskName("Partition 3"), new TaskName("Partition 4")), - Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2), - Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2, new TaskName("Partition 3") -> 3, new TaskName("Partition 4") -> 4)) - } - - /** - * Generate a random alphanumeric string of the specified length - * @param length Specifies length of the string to generate - * @return An alphanumeric string - */ - def generateString (length : Int) : String = { - Random.alphanumeric.take(length).mkString - } -} - -/** - * Little mock for testing the input stream partition method. - */ -class MockSystemFactory extends SystemFactory { - def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = null - def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = null - def getAdmin(systemName: String, config: Config) = new SinglePartitionWithoutOffsetsSystemAdmin } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml index 50a7426..2b1aa3e 100644 --- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml +++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml @@ -128,14 +128,14 @@ %th SystemStreamPartitions %th Container %tbody - - for((taskId, taskNames) <- state.runningTaskToTaskNames) - - for((taskName, ssps) <- taskNames) + - for((containerId, container) <- state.runningTasks) + - val containerModel = state.jobCoordinator.jobModel.getContainers.get(containerId) + - for((taskName, taskModel) <- containerModel.getTasks) %tr - %td= taskId + %td= containerId %td= taskName - %td= ssps.map(_.toString).toList.sorted.mkString(", ") + %td= taskModel.getSystemStreamPartitions.map(_.toString).toList.sorted.mkString(", ") %td - - val container = state.runningTasks(taskId) %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString %div.tab-pane#config http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala index 723b673..91aff3c 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala @@ -20,7 +20,6 @@ package org.apache.samza.job.yarn import scala.collection.JavaConversions.asScalaBuffer - import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus, NodeReport } import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -28,17 +27,17 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.samza.config.MapConfig +import org.apache.samza.config.Config import org.apache.samza.config.ShellCommandConfig import org.apache.samza.config.YarnConfig import org.apache.samza.config.YarnConfig.Config2Yarn -import org.apache.samza.config.serializers.JsonConfigSerializer import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CONTAINER_MEM import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CPU_CORES import org.apache.samza.metrics.JmxServer import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.util.hadoop.HttpFileSystem - import org.apache.samza.util.Logging +import org.apache.samza.serializers.model.SamzaObjectMapper /** * When YARN executes an application master, it needs a bash command to @@ -68,7 +67,7 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { info("got node manager port: %s" format nodePortString) val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString) info("got node manager http port: %s" format nodeHttpPortString) - val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(ShellCommandConfig.ENV_CONFIG))) + val config = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_CONFIG), classOf[Config])) info("got config: %s" format config) val hConfig = new YarnConfiguration hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala index b076968..ce88698 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala @@ -36,7 +36,6 @@ import org.apache.samza.webapp.ApplicationMasterWebServlet class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper) extends YarnAppMasterListener with Logging { var rpcApp: HttpServer = null var webApp: HttpServer = null - var coordinatorApp: HttpServer = null override def onInit() { // try starting the samza AM dashboard at a random rpc and tracking port @@ -50,13 +49,10 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state)) webApp.start - coordinatorApp = new HttpServer - coordinatorApp.addServlet("/*", new JobServlet(config, state.tasksToSSPTaskNames, state.taskNameToChangeLogPartitionMapping)) - coordinatorApp.start - + state.jobCoordinator.start state.rpcUrl = rpcApp.getUrl state.trackingUrl = webApp.getUrl - state.coordinatorUrl = coordinatorApp.getUrl + state.coordinatorUrl = state.jobCoordinator.server.getUrl info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" format (state.rpcUrl, state.trackingUrl, state.coordinatorUrl)) } @@ -70,8 +66,6 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry webApp.stop } - if (coordinatorApp != null) { - coordinatorApp.stop - } + state.jobCoordinator.stop } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala index 423e903..8ba435e 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala @@ -24,8 +24,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId import java.util import org.apache.samza.system.SystemStreamPartition import org.apache.samza.container.TaskName -import org.apache.samza.container.TaskNamesToSystemStreamPartitions import java.net.URL +import org.apache.samza.job.model.JobModel +import org.apache.samza.coordinator.JobCoordinator /** * Samza's application master has state that is usually manipulated based on @@ -42,9 +43,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod var unclaimedTasks = Set[Int]() var finishedTasks = Set[Int]() var runningTasks = Map[Int, YarnContainer]() - var runningTaskToTaskNames = Map[Int, util.Map[TaskName, util.Set[SystemStreamPartition]]]() - var tasksToSSPTaskNames = Map[Int, TaskNamesToSystemStreamPartitions]() - var taskNameToChangeLogPartitionMapping = Map[TaskName, Int]() + var jobCoordinator: JobCoordinator = null var status = FinalApplicationStatus.UNDEFINED var jobHealthy = true http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala index eb52529..b0b6543 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala @@ -21,10 +21,7 @@ package org.apache.samza.job.yarn import java.nio.ByteBuffer import java.util.Collections - import scala.collection.JavaConversions._ -import scala.collection.JavaConverters.mapAsJavaMapConverter - import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation @@ -45,9 +42,8 @@ import org.apache.samza.config.YarnConfig.Config2Yarn import org.apache.samza.job.CommandBuilder import org.apache.samza.job.ShellCommandBuilder import org.apache.samza.util.Util - import org.apache.samza.util.Logging -import org.apache.samza.container.TaskNamesToSystemStreamPartitions +import org.apache.samza.coordinator.JobCoordinator object SamzaAppMasterTaskManager { val DEFAULT_CONTAINER_MEM = 1024 @@ -73,8 +69,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA info("No %s specified. Defaulting to one container." format YarnConfig.TASK_COUNT) 1 }) - state.tasksToSSPTaskNames = Util.assignContainerToSSPTaskNames(config, state.taskCount) - state.taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, state.tasksToSSPTaskNames) + state.jobCoordinator = JobCoordinator(config, state.taskCount) var taskFailures = Map[Int, TaskFailure]() var tooManyFailedContainers = false @@ -85,7 +80,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA override def onInit() { state.neededContainers = state.taskCount - state.unclaimedTasks = (0 until state.taskCount).toSet + state.unclaimedTasks = state.jobCoordinator.jobModel.getContainers.keySet.map(_.toInt).toSet containerManager = NMClient.createNMClient() containerManager.init(conf) containerManager.start @@ -109,7 +104,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA state.unclaimedTasks.headOption match { case Some(taskId) => { info("Got available task id (%d) for container: %s" format (taskId, container)) - val sspTaskNames: TaskNamesToSystemStreamPartitions = state.tasksToSSPTaskNames.getOrElse(taskId, TaskNamesToSystemStreamPartitions()) + val sspTaskNames = state.jobCoordinator.jobModel.getContainers.get(taskId) info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, taskId)) val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName) val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder] @@ -135,7 +130,6 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA } state.runningTasks += taskId -> new YarnContainer(container) state.unclaimedTasks -= taskId - state.runningTaskToTaskNames += taskId -> sspTaskNames.getJavaFriendlyType info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr)) @@ -157,7 +151,6 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA taskId match { case Some(taskId) => { state.runningTasks -= taskId - state.runningTaskToTaskNames -= taskId } case _ => None } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index d7aa240..24b11da 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -30,12 +30,12 @@ import org.apache.samza.job.ApplicationStatus.Running import org.apache.samza.job.StreamJob import org.apache.samza.job.ApplicationStatus.SuccessfulFinish import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish -import org.apache.samza.config.serializers.JsonConfigSerializer import org.apache.samza.config.YarnConfig.Config2Yarn import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.YarnConfig import org.apache.samza.config.ShellCommandConfig import org.apache.samza.SamzaException +import org.apache.samza.serializers.model.SamzaObjectMapper object YarnJob { val DEFAULT_AM_CONTAINER_MEM = 1024 @@ -60,7 +60,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)), Some({ val envMap = Map( - ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)), + ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)), ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse(""))) val envMapWithJavaHome = config.getAMJavaHome match { case Some(javaHome) => envMap + (ShellCommandConfig.ENV_JAVA_HOME -> javaHome) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index d6cf22d..0afee64 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -29,11 +29,12 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.codehaus.jackson.map.ObjectMapper import java.util.HashMap +import org.apache.samza.serializers.model.SamzaObjectMapper class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport { val yarnConfig = new YarnConfiguration val client = new ClientHelper(yarnConfig) - val jsonMapper = new ObjectMapper() + val jsonMapper = SamzaObjectMapper.getObjectMapper before() { contentType = "application/json" @@ -79,20 +80,18 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r get("/am") { val containers = new HashMap[String, HashMap[String, Object]] - state.runningTasks.values.foreach(c => { - val containerIdStr = c.id.toString - val containerMap = new HashMap[String, Object] - - val taskId = state.runningTasks.filter { case (_, container) => container.id.toString.equals(containerIdStr) }.keys.head - val taskNames = new java.util.ArrayList(state.runningTaskToTaskNames.get(taskId).get.toList) - - containerMap.put("yarn-address", c.nodeHttpAddress) - containerMap.put("start-time", c.startTime.toString) - containerMap.put("up-time", c.upTime.toString) - containerMap.put("task-names", taskNames) - containerMap.put("task-id", taskId.toString) - containers.put(containerIdStr, containerMap) - }) + state.runningTasks.foreach { + case (containerId, container) => + val yarnContainerId = container.id.toString + val containerMap = new HashMap[String, Object] + val taskModels = state.jobCoordinator.jobModel.getContainers.get(containerId).getTasks + containerMap.put("yarn-address", container.nodeHttpAddress) + containerMap.put("start-time", container.startTime.toString) + containerMap.put("up-time", container.upTime.toString) + containerMap.put("task-models", taskModels) + containerMap.put("container-id", containerId.toString) + containers.put(yarnContainerId, containerMap) + } val status = Map[String, Object]( "app-attempt-id" -> state.appAttemptId.toString, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala index a1c090d..81dea9d 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala @@ -29,26 +29,25 @@ import org.junit.Test import scala.collection.JavaConversions._ import org.apache.samza.config.Config import org.apache.samza.container.TaskName -import org.apache.samza.container.TaskNamesToSystemStreamPartitions import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition +import org.apache.samza.coordinator.JobCoordinator class TestSamzaAppMasterService { @Test def testAppMasterDashboardShouldStart { + val config = getDummyConfig val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) - val service = new SamzaAppMasterService(getDummyConfig, state, null, null) + val service = new SamzaAppMasterService(config, state, null, null) val taskName = new TaskName("test") - state.tasksToSSPTaskNames = Map[Int, TaskNamesToSystemStreamPartitions]() - state.taskNameToChangeLogPartitionMapping = Map[TaskName, Int]() - state.tasksToSSPTaskNames += 0 -> new TaskNamesToSystemStreamPartitions(Map(taskName -> Set(new SystemStreamPartition("a", "b", new Partition(0)), new SystemStreamPartition("a", "b", new Partition(0))))) - state.taskNameToChangeLogPartitionMapping += taskName -> 0 + state.jobCoordinator = JobCoordinator(config, 1) // start the dashboard service.onInit assertTrue(state.rpcUrl.getPort > 0) assertTrue(state.trackingUrl.getPort > 0) + assertTrue(state.coordinatorUrl.getPort > 0) // check to see if it's running val url = new URL(state.rpcUrl.toString + "am") @@ -70,8 +69,11 @@ class TestSamzaAppMasterService { @Test def testAppMasterDashboardWebServiceShouldStart { // Create some dummy config + val config = getDummyConfig val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) - val service = new SamzaAppMasterService(getDummyConfig, state, null, null) + val service = new SamzaAppMasterService(config, state, null, null) + + state.jobCoordinator = JobCoordinator(config, 1) // start the dashboard service.onInit http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index 5f65dbd..cab5101 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -31,16 +31,19 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.samza.Partition import org.apache.samza.config.Config +import org.apache.samza.config.YarnConfig.Config2Yarn import org.apache.samza.config.MapConfig import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.SystemFactory import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin import org.apache.samza.util.Util import org.junit.Test import scala.collection.JavaConversions._ import TestSamzaAppMasterTaskManager._ import java.net.URL +import org.apache.samza.system.SystemAdmin +import org.apache.samza.system.SystemStreamMetadata +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata object TestSamzaAppMasterTaskManager { def getContainer(containerId: ContainerId) = new Container { @@ -235,7 +238,6 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container2)) assertEquals(0, state.neededContainers) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(1, containersRequested) assertEquals(1, containersStarted) @@ -244,7 +246,6 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container3)) assertEquals(0, state.neededContainers) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(1, amClient.getClient.requests.size) assertEquals(1, amClient.getClient.getRelease.size) @@ -260,7 +261,6 @@ class TestSamzaAppMasterTaskManager { assertFalse(taskManager.shouldShutdown) assertEquals(0, state.neededContainers) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(0, amClient.getClient.requests.size) assertEquals(0, amClient.getClient.getRelease.size) @@ -277,7 +277,6 @@ class TestSamzaAppMasterTaskManager { assertEquals(0, state.neededContainers) assertTrue(state.jobHealthy) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) } @@ -307,13 +306,11 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container2)) assertEquals(1, state.neededContainers) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(1, state.unclaimedTasks.size) assertEquals(1, containersStarted) taskManager.onContainerAllocated(getContainer(container3)) assertEquals(0, state.neededContainers) assertEquals(2, state.runningTasks.size) - assertEquals(2, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(2, containersStarted) @@ -321,7 +318,6 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerCompleted(getContainerStatus(container2, 0, "")) assertEquals(0, state.neededContainers) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(1, state.completedTasks) @@ -329,7 +325,6 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerCompleted(getContainerStatus(container3, 1, "expected failure here")) assertEquals(1, state.neededContainers) assertEquals(0, state.runningTasks.size) - assertEquals(0, state.runningTaskToTaskNames.size) assertEquals(1, state.unclaimedTasks.size) assertEquals(1, state.completedTasks) assertFalse(taskManager.shouldShutdown) @@ -338,7 +333,6 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerAllocated(getContainer(container3)) assertEquals(0, state.neededContainers) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(3, containersStarted) @@ -346,7 +340,6 @@ class TestSamzaAppMasterTaskManager { taskManager.onContainerCompleted(getContainerStatus(container3, 0, "")) assertEquals(0, state.neededContainers) assertEquals(0, state.runningTasks.size) - assertEquals(0, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(2, state.completedTasks) assertTrue(taskManager.shouldShutdown) @@ -379,19 +372,16 @@ class TestSamzaAppMasterTaskManager { assertEquals(0, amClient.getClient.getRelease.size) assertEquals(1, state.neededContainers) assertEquals(0, state.runningTasks.size) - assertEquals(0, state.runningTaskToTaskNames.size) assertEquals(1, state.unclaimedTasks.size) taskManager.onContainerAllocated(getContainer(container2)) assertEquals(0, state.neededContainers) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(1, containersRequested) assertEquals(1, containersStarted) taskManager.onContainerAllocated(getContainer(container3)) assertEquals(0, state.neededContainers) assertEquals(1, state.runningTasks.size) - assertEquals(1, state.runningTaskToTaskNames.size) assertEquals(0, state.unclaimedTasks.size) assertEquals(1, containersRequested) assertEquals(1, containersStarted) @@ -413,6 +403,22 @@ class MockSystemFactory extends SystemFactory { } def getAdmin(systemName: String, config: Config) = { - new SinglePartitionWithoutOffsetsSystemAdmin + val containerCount = config.getTaskCount.getOrElse(1) + new MockSystemAdmin(containerCount) + } +} + +/** + * Helper class that returns metadata for each stream that contains numTasks partitions in it. + */ +class MockSystemAdmin(numTasks: Int) extends SystemAdmin { + def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null + def getSystemStreamMetadata(streamNames: java.util.Set[String]) = { + streamNames.map(streamName => { + var partitionMetadata = (0 until numTasks).map(partitionId => { + new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, null, null) + }).toMap + streamName -> new SystemStreamMetadata(streamName, partitionMetadata) + }).toMap[String, SystemStreamMetadata] } }
