Repository: incubator-samza Updated Branches: refs/heads/master c96ecc68b -> 429c1edb3
SAMZA-197: Use JSON rather than custom encoding for container SSPs. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/429c1edb Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/429c1edb Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/429c1edb Branch: refs/heads/master Commit: 429c1edb38b20fb20358062d831b1b9463a7ec9b Parents: c96ecc6 Author: Jakob Homan <[email protected]> Authored: Mon Mar 24 14:02:38 2014 -0700 Committer: Jakob Homan <[email protected]> Committed: Mon Mar 24 14:02:38 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 2 +- .../apache/samza/job/ShellCommandBuilder.scala | 2 +- .../main/scala/org/apache/samza/util/Util.scala | 59 +++++++------------- .../scala/org/apache/samza/util/TestUtil.scala | 30 ++-------- .../yarn/TestSamzaAppMasterTaskManager.scala | 32 +++++------ 5 files changed, 43 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 77bf0e9..ef14643 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -72,7 +72,7 @@ object SamzaContainer extends Logging { val config = JsonConfigSerializer.fromJson(configStr) val encodedStreamsAndPartitions = System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS) - val partitions = Util.createStreamPartitionsFromString(encodedStreamsAndPartitions) + val partitions = Util.deserializeSSPSetFromJSON(encodedStreamsAndPartitions) if (partitions.isEmpty) { throw new SamzaException("No partitions for this task. Can't run a task without partition assignments. It's likely that the partition manager for this system doesn't know about the stream you're trying to read.") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala index b4eaf90..f8865b1 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala @@ -29,7 +29,7 @@ class ShellCommandBuilder extends CommandBuilder { def buildCommand() = config.getCommand def buildEnvironment(): java.util.Map[String, String] = { - val streamsAndPartsString = Util.createStreamPartitionString(systemStreamPartitions.toSet) // Java to Scala set conversion + val streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion Map( ShellCommandConfig.ENV_CONTAINER_NAME -> name, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 24a954c..e44e2ba 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -27,7 +27,11 @@ import org.apache.samza.config.Config import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import scala.collection.JavaConversions._ -import org.apache.samza.system.{ SystemStreamPartition, SystemAdmin, SystemFactory, SystemStream } +import org.apache.samza.system.{ SystemStreamPartition, SystemFactory, SystemStream } +import org.codehaus.jackson.map.ObjectMapper +import org.codehaus.jackson.`type`.TypeReference +import java.util + object Util extends Logging { val random = new Random @@ -140,51 +144,28 @@ object Util extends Logging { ssp.filter(_.getPartition.getPartitionId % containerCount == containerId) } - val partitionSeparator = ";" - val topicSeparator = "," - val topicStreamGrouper = "#" /** - * Serialize a collection of stream-partitions to a string suitable for passing between processes. - * The streams will be grouped by partition. The partition will be separated from the topics by - * a colon (":"), the topics separated by commas (",") and the topic-stream groups by a slash ("/"). - * Ordering of the grouping is not specified. - * - * For example: (A,0),(A,4)(B,0)(B,4)(C,0) could be transformed to: 4:a,b/0:a,b,c - * - * @param sp Stream topics to group into a string - * @return Serialized string of the topics and streams grouped and delimited + * Jackson really hates Scala's classes, so we need to wrap up the SSP in a form Jackson will take */ - def createStreamPartitionString(sp: Set[SystemStreamPartition]): String = { - for ( - ch <- List(partitionSeparator, topicSeparator, topicStreamGrouper); - s <- sp - ) { - if (s.getStream.contains(ch)) throw new IllegalArgumentException(s + " contains illegal character " + ch) - } - - sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + partitionSeparator + z._2.map(y => y.getSystem + "." + y.getStream).mkString(topicSeparator)).mkString(topicStreamGrouper) - + private class SSPWrapper(@scala.beans.BeanProperty var partition:java.lang.Integer = null, + @scala.beans.BeanProperty var Stream:java.lang.String = null, + @scala.beans.BeanProperty var System:java.lang.String = null) { + def this() { this(null, null, null) } + def this(ssp:SystemStreamPartition) { this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, ssp.getSystemStream.getSystem)} } - /** - * Invert @{list createStreamPartitionString}, building a list of streams and their partitions, - * from the string that function produced. - * - * @param sp Strings and partitions encoded as a stream by the above function - * @return List of string and partition tuples extracted from string. Order is not necessarily preserved. - */ - def createStreamPartitionsFromString(sp: String): Set[SystemStreamPartition] = { - if (sp == null || sp.isEmpty) return Set.empty + def serializeSSPSetToJSON(ssps: Set[SystemStreamPartition]): String = { + val al = new util.ArrayList[SSPWrapper](ssps.size) + for(ssp <- ssps) { al.add(new SSPWrapper(ssp)) } - def splitPartitionGroup(pg: String) = { - val split = pg.split(partitionSeparator) // Seems like there should be a more scalar way of doing this - val part = split(0).toInt - val streams = split(1).split(topicSeparator).toList + new ObjectMapper().writeValueAsString(al) + } - streams.map(s => new SystemStreamPartition(getSystemStreamFromNames(s), new Partition(part))).toSet - } + def deserializeSSPSetFromJSON(ssp: String) = { + val om = new ObjectMapper() - sp.split(topicStreamGrouper).map(splitPartitionGroup(_)).toSet.flatten + val asWrapper = om.readValue(ssp, new TypeReference[util.ArrayList[SSPWrapper]]() { }).asInstanceOf[util.ArrayList[SSPWrapper]] + asWrapper.map(w => new SystemStreamPartition(w.getSystem, w.getStream(), new Partition(w.getPartition()))).toSet } /** http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index 49aed36..b8c369b 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -86,26 +86,8 @@ class TestUtil { } @Test - def testCreateStreamPartitionStringBlocksDelimeters() { - val partOne = new Partition(1) - val toTry = List(Util.topicSeparator, Util.topicStreamGrouper, Util.partitionSeparator) - .map(ch => (ch, Set(new SystemStreamPartition("kafka", "good1", partOne), - new SystemStreamPartition("kafka", "bad" + ch, partOne), - new SystemStreamPartition("notkafka", "alsogood", partOne)))) - toTry.foreach(t => try { - createStreamPartitionString(t._2) - fail("Should have thrown an exception") - } catch { - case iae:IllegalArgumentException => - val expected = "SystemStreamPartition [partition=Partition [partition=1], system" + - "=kafka, stream=bad" + t._1 + "] contains illegal character " + t._1 - assertEquals(expected, iae.getMessage) - } ) - } - - @Test - def testCreateStreamPartitionStringRoundTrip() { - val getPartitions = { + def testJsonCreateStreamPartitionStringRoundTrip() { + val getPartitions: Set[SystemStreamPartition] = { // Build a heavily skewed set of partitions. def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet val system = "all-same-system." @@ -117,14 +99,12 @@ class TestUtil { part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet } - val streamsAndParts = getStreamsAndPartitionsForContainer(0, 4, getPartitions) + val streamsAndParts: Set[SystemStreamPartition] = getStreamsAndPartitionsForContainer(0, 4, getPartitions).toSet println(streamsAndParts) - val asString = createStreamPartitionString(streamsAndParts) - println(asString) - val backToStreamsAndParts = createStreamPartitionsFromString(asString) + val asString = serializeSSPSetToJSON(streamsAndParts) + val backToStreamsAndParts = deserializeSSPSetFromJSON(asString) assertEquals(streamsAndParts, backToStreamsAndParts) - } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index efd330d..9d832ae 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -22,7 +22,7 @@ import org.junit.Assert._ import org.junit.Test import org.apache.samza.config.Config import org.apache.samza.config.MapConfig -import org.apache.samza.{Partition, SamzaException} +import org.apache.samza.Partition import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.util.ConverterUtils import scala.collection.JavaConversions._ @@ -35,9 +35,8 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.records.NodeReport import TestSamzaAppMasterTaskManager._ -import org.apache.samza.system.{SystemStreamPartition, SystemAdmin, SystemFactory} +import org.apache.samza.system.{SystemStreamPartition, SystemFactory} import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.util.Util._ import org.apache.samza.util.Util import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin @@ -130,7 +129,8 @@ object TestSamzaAppMasterTaskManager { } class TestSamzaAppMasterTaskManager { - + import org.junit.Assert._ + val config = new MapConfig(Map[String, String]( "yarn.container.count" -> "1", "systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory", @@ -378,33 +378,33 @@ class TestSamzaAppMasterTaskManager { @Test def testPartitionsShouldWorkWithMoreTasksThanPartitions { val onePartition = Set(new SystemStreamPartition("system", "stream", new Partition(0))) - assert(Util.getStreamsAndPartitionsForContainer(0, 2, onePartition).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0))))) - assert(Util.getStreamsAndPartitionsForContainer(1, 2, onePartition).equals(Set())) + assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, onePartition), Set(new SystemStreamPartition("system", "stream", new Partition(0)))) + assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, onePartition), Set()) } @Test def testPartitionsShouldWorkWithMorePartitionsThanTasks { val fivePartitions = (0 until 5).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet - assert(Util.getStreamsAndPartitionsForContainer(0, 2, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(4))))) - assert(Util.getStreamsAndPartitionsForContainer(1, 2, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(3))))) + assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(4)))) + assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(3)))) } @Test def testPartitionsShouldWorkWithTwelvePartitionsAndFiveContainers { val fivePartitions = (0 until 12).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet - assert(Util.getStreamsAndPartitionsForContainer(0, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), new SystemStreamPartition("system", "stream", new Partition(10))))) - assert(Util.getStreamsAndPartitionsForContainer(1, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), new SystemStreamPartition("system", "stream", new Partition(11))))) - assert(Util.getStreamsAndPartitionsForContainer(2, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(7))))) - assert(Util.getStreamsAndPartitionsForContainer(3, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(3)), new SystemStreamPartition("system", "stream", new Partition(8))))) - assert(Util.getStreamsAndPartitionsForContainer(4, 5, fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(4)), new SystemStreamPartition("system", "stream", new Partition(9))))) + assertEquals(Util.getStreamsAndPartitionsForContainer(0, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), new SystemStreamPartition("system", "stream", new Partition(10)))) + assertEquals(Util.getStreamsAndPartitionsForContainer(1, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), new SystemStreamPartition("system", "stream", new Partition(11)))) + assertEquals(Util.getStreamsAndPartitionsForContainer(2, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(2)), new SystemStreamPartition("system", "stream", new Partition(7)))) + assertEquals(Util.getStreamsAndPartitionsForContainer(3, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(3)), new SystemStreamPartition("system", "stream", new Partition(8)))) + assertEquals(Util.getStreamsAndPartitionsForContainer(4, 5, fivePartitions), Set(new SystemStreamPartition("system", "stream", new Partition(4)), new SystemStreamPartition("system", "stream", new Partition(9)))) } @Test def testPartitionsShouldWorkWithEqualPartitionsAndTasks { val twoPartitions = (0 until 2).map(p => new SystemStreamPartition("system", "stream", new Partition(p))).toSet - assert(Util.getStreamsAndPartitionsForContainer(0, 2, twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0))))) - assert(Util.getStreamsAndPartitionsForContainer(1, 2, twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new Partition(1))))) - assert(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new SystemStreamPartition("system", "stream", new Partition(0)))).equals(Set(new SystemStreamPartition("system", "stream", new Partition(0))))) + assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, twoPartitions), Set(new SystemStreamPartition("system", "stream", new Partition(0)))) + assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, twoPartitions), Set(new SystemStreamPartition("system", "stream", new Partition(1)))) + assertEquals(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new SystemStreamPartition("system", "stream", new Partition(0)))), Set(new SystemStreamPartition("system", "stream", new Partition(0)))) } val clock = () => System.currentTimeMillis
