Repository: incubator-samza
Updated Branches:
  refs/heads/master c96ecc68b -> 429c1edb3


SAMZA-197: Use JSON rather than custom encoding for container SSPs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/429c1edb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/429c1edb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/429c1edb

Branch: refs/heads/master
Commit: 429c1edb38b20fb20358062d831b1b9463a7ec9b
Parents: c96ecc6
Author: Jakob Homan <[email protected]>
Authored: Mon Mar 24 14:02:38 2014 -0700
Committer: Jakob Homan <[email protected]>
Committed: Mon Mar 24 14:02:38 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala |  2 +-
 .../apache/samza/job/ShellCommandBuilder.scala  |  2 +-
 .../main/scala/org/apache/samza/util/Util.scala | 59 +++++++-------------
 .../scala/org/apache/samza/util/TestUtil.scala  | 30 ++--------
 .../yarn/TestSamzaAppMasterTaskManager.scala    | 32 +++++------
 5 files changed, 43 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 77bf0e9..ef14643 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -72,7 +72,7 @@ object SamzaContainer extends Logging {
     val config = JsonConfigSerializer.fromJson(configStr)
     val encodedStreamsAndPartitions = 
System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS)
 
-    val partitions = 
Util.createStreamPartitionsFromString(encodedStreamsAndPartitions)
+    val partitions = 
Util.deserializeSSPSetFromJSON(encodedStreamsAndPartitions)
 
     if (partitions.isEmpty) {
       throw new SamzaException("No partitions for this task. Can't run a task 
without partition assignments. It's likely that the partition manager for this 
system doesn't know about the stream you're trying to read.")

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
index b4eaf90..f8865b1 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -29,7 +29,7 @@ class ShellCommandBuilder extends CommandBuilder {
   def buildCommand() = config.getCommand
 
   def buildEnvironment(): java.util.Map[String, String] = {
-    val streamsAndPartsString = 
Util.createStreamPartitionString(systemStreamPartitions.toSet) // Java to Scala 
set conversion
+    val streamsAndPartsString = 
Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set 
conversion
     
     Map(
       ShellCommandConfig.ENV_CONTAINER_NAME -> name,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 24a954c..e44e2ba 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -27,7 +27,11 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import scala.collection.JavaConversions._
-import org.apache.samza.system.{ SystemStreamPartition, SystemAdmin, 
SystemFactory, SystemStream }
+import org.apache.samza.system.{ SystemStreamPartition, SystemFactory, 
SystemStream }
+import org.codehaus.jackson.map.ObjectMapper
+import org.codehaus.jackson.`type`.TypeReference
+import java.util
+
 
 object Util extends Logging {
   val random = new Random
@@ -140,51 +144,28 @@ object Util extends Logging {
     ssp.filter(_.getPartition.getPartitionId % containerCount == containerId)
   }
 
-  val partitionSeparator = ";"
-  val topicSeparator = ","
-  val topicStreamGrouper = "#"
   /**
-   * Serialize a collection of stream-partitions to a string suitable for 
passing between processes.
-   * The streams will be grouped by partition. The partition will be separated 
from the topics by
-   * a colon (":"), the topics separated by commas (",") and the topic-stream 
groups by a slash ("/").
-   * Ordering of the grouping is not specified.
-   *
-   * For example: (A,0),(A,4)(B,0)(B,4)(C,0) could be transformed to: 
4:a,b/0:a,b,c
-   *
-   * @param sp Stream topics to group into a string
-   * @return Serialized string of the topics and streams grouped and delimited
+   * Jackson really hates Scala's classes, so we need to wrap up the SSP in a 
form Jackson will take
    */
-  def createStreamPartitionString(sp: Set[SystemStreamPartition]): String = {
-    for (
-      ch <- List(partitionSeparator, topicSeparator, topicStreamGrouper);
-      s <- sp
-    ) {
-      if (s.getStream.contains(ch)) throw new IllegalArgumentException(s + " 
contains illegal character " + ch)
-    }
-
-    sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + 
partitionSeparator + z._2.map(y => y.getSystem + "." + 
y.getStream).mkString(topicSeparator)).mkString(topicStreamGrouper)
-
+  private class SSPWrapper(@scala.beans.BeanProperty var 
partition:java.lang.Integer = null,
+                           @scala.beans.BeanProperty var 
Stream:java.lang.String = null,
+                           @scala.beans.BeanProperty var 
System:java.lang.String = null) {
+    def this() { this(null, null, null) }
+    def this(ssp:SystemStreamPartition) { 
this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, 
ssp.getSystemStream.getSystem)}
   }
 
-  /**
-   * Invert @{list createStreamPartitionString}, building a list of streams 
and their partitions,
-   * from the string that function produced.
-   *
-   * @param sp Strings and partitions encoded as a stream by the above function
-   * @return List of string and partition tuples extracted from string. Order 
is not necessarily preserved.
-   */
-  def createStreamPartitionsFromString(sp: String): Set[SystemStreamPartition] 
= {
-    if (sp == null || sp.isEmpty) return Set.empty
+  def serializeSSPSetToJSON(ssps: Set[SystemStreamPartition]): String = {
+    val al = new util.ArrayList[SSPWrapper](ssps.size)
+    for(ssp <- ssps) { al.add(new SSPWrapper(ssp)) }
 
-    def splitPartitionGroup(pg: String) = {
-      val split = pg.split(partitionSeparator) // Seems like there should be a 
more scalar way of doing this
-      val part = split(0).toInt
-      val streams = split(1).split(topicSeparator).toList
+    new ObjectMapper().writeValueAsString(al)
+  }
 
-      streams.map(s => new SystemStreamPartition(getSystemStreamFromNames(s), 
new Partition(part))).toSet
-    }
+  def deserializeSSPSetFromJSON(ssp: String) = {
+    val om = new ObjectMapper()
 
-    sp.split(topicStreamGrouper).map(splitPartitionGroup(_)).toSet.flatten
+    val asWrapper = om.readValue(ssp, new 
TypeReference[util.ArrayList[SSPWrapper]]() { 
}).asInstanceOf[util.ArrayList[SSPWrapper]]
+    asWrapper.map(w => new SystemStreamPartition(w.getSystem, w.getStream(), 
new Partition(w.getPartition()))).toSet
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 49aed36..b8c369b 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -86,26 +86,8 @@ class TestUtil {
   }
   
   @Test
-  def testCreateStreamPartitionStringBlocksDelimeters() {
-    val partOne = new Partition(1)
-    val toTry = List(Util.topicSeparator, Util.topicStreamGrouper, 
Util.partitionSeparator)
-      .map(ch => (ch, Set(new SystemStreamPartition("kafka", "good1", partOne),
-      new SystemStreamPartition("kafka", "bad" + ch, partOne),
-      new SystemStreamPartition("notkafka", "alsogood", partOne))))
-    toTry.foreach(t => try {
-      createStreamPartitionString(t._2)
-      fail("Should have thrown an exception")
-    } catch {
-      case iae:IllegalArgumentException =>
-        val expected = "SystemStreamPartition [partition=Partition 
[partition=1], system" +
-          "=kafka, stream=bad" + t._1 + "] contains illegal character " + t._1
-        assertEquals(expected, iae.getMessage)
-    } )
-  }
-
-  @Test
-  def testCreateStreamPartitionStringRoundTrip() {
-    val getPartitions = {
+  def testJsonCreateStreamPartitionStringRoundTrip() {
+    val getPartitions: Set[SystemStreamPartition] = {
       // Build a heavily skewed set of partitions.
       def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
       val system = "all-same-system."
@@ -117,14 +99,12 @@ class TestUtil {
            part <- streamsMap.getOrElse(s, Set.empty)) yield new 
SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
     }
 
-    val streamsAndParts = getStreamsAndPartitionsForContainer(0, 4, 
getPartitions)
+    val streamsAndParts: Set[SystemStreamPartition] = 
getStreamsAndPartitionsForContainer(0, 4, getPartitions).toSet
     println(streamsAndParts)
-    val asString = createStreamPartitionString(streamsAndParts)
-    println(asString)
-    val backToStreamsAndParts = createStreamPartitionsFromString(asString)
+    val asString = serializeSSPSetToJSON(streamsAndParts)
 
+    val backToStreamsAndParts = deserializeSSPSetFromJSON(asString)
     assertEquals(streamsAndParts, backToStreamsAndParts)
-
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/429c1edb/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index efd330d..9d832ae 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -22,7 +22,7 @@ import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
-import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.Partition
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.util.ConverterUtils
 import scala.collection.JavaConversions._
@@ -35,9 +35,8 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.records.NodeReport
 import TestSamzaAppMasterTaskManager._
-import org.apache.samza.system.{SystemStreamPartition, SystemAdmin, 
SystemFactory}
+import org.apache.samza.system.{SystemStreamPartition, SystemFactory}
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.util.Util._
 import org.apache.samza.util.Util
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 
@@ -130,7 +129,8 @@ object TestSamzaAppMasterTaskManager {
 }
 
 class TestSamzaAppMasterTaskManager {
-  
+  import org.junit.Assert._
+
   val config = new MapConfig(Map[String, String](
     "yarn.container.count" -> "1",
     "systems.test-system.samza.factory" -> 
"org.apache.samza.job.yarn.MockSystemFactory",
@@ -378,33 +378,33 @@ class TestSamzaAppMasterTaskManager {
   @Test
   def testPartitionsShouldWorkWithMoreTasksThanPartitions {
     val onePartition = Set(new SystemStreamPartition("system", "stream", new 
Partition(0)))
-    assert(Util.getStreamsAndPartitionsForContainer(0, 2, 
onePartition).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(0)))))
-    assert(Util.getStreamsAndPartitionsForContainer(1, 2, 
onePartition).equals(Set()))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, onePartition), 
Set(new SystemStreamPartition("system", "stream", new Partition(0))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, onePartition), 
Set())
   }
 
   @Test
   def testPartitionsShouldWorkWithMorePartitionsThanTasks {
     val fivePartitions = (0 until 5).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
-    assert(Util.getStreamsAndPartitionsForContainer(0, 2, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), 
new SystemStreamPartition("system", "stream", new Partition(4)))))
-    assert(Util.getStreamsAndPartitionsForContainer(1, 2, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(1)), new SystemStreamPartition("system", "stream", new 
Partition(3)))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(0)), new SystemStreamPartition("system", "stream", new Partition(2)), 
new SystemStreamPartition("system", "stream", new Partition(4))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(1)), new SystemStreamPartition("system", "stream", new Partition(3))))
   }
 
   @Test
   def testPartitionsShouldWorkWithTwelvePartitionsAndFiveContainers {
     val fivePartitions = (0 until 12).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
-    assert(Util.getStreamsAndPartitionsForContainer(0, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), 
new SystemStreamPartition("system", "stream", new Partition(10)))))
-    assert(Util.getStreamsAndPartitionsForContainer(1, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), 
new SystemStreamPartition("system", "stream", new Partition(11)))))
-    assert(Util.getStreamsAndPartitionsForContainer(2, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(2)), new SystemStreamPartition("system", "stream", new 
Partition(7)))))
-    assert(Util.getStreamsAndPartitionsForContainer(3, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(3)), new SystemStreamPartition("system", "stream", new 
Partition(8)))))
-    assert(Util.getStreamsAndPartitionsForContainer(4, 5, 
fivePartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(4)), new SystemStreamPartition("system", "stream", new 
Partition(9)))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(0)), new SystemStreamPartition("system", "stream", new Partition(5)), 
new SystemStreamPartition("system", "stream", new Partition(10))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(1, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(1)), new SystemStreamPartition("system", "stream", new Partition(6)), 
new SystemStreamPartition("system", "stream", new Partition(11))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(2, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(2)), new SystemStreamPartition("system", "stream", new Partition(7))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(3, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(3)), new SystemStreamPartition("system", "stream", new Partition(8))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(4, 5, 
fivePartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(4)), new SystemStreamPartition("system", "stream", new Partition(9))))
   }
 
   @Test
   def testPartitionsShouldWorkWithEqualPartitionsAndTasks {
     val twoPartitions = (0 until 2).map(p => new 
SystemStreamPartition("system", "stream", new Partition(p))).toSet
-    assert(Util.getStreamsAndPartitionsForContainer(0, 2, 
twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(0)))))
-    assert(Util.getStreamsAndPartitionsForContainer(1, 2, 
twoPartitions).equals(Set(new SystemStreamPartition("system", "stream", new 
Partition(1)))))
-    assert(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new 
SystemStreamPartition("system", "stream", new Partition(0)))).equals(Set(new 
SystemStreamPartition("system", "stream", new Partition(0)))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 2, 
twoPartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(0))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(1, 2, 
twoPartitions), Set(new SystemStreamPartition("system", "stream", new 
Partition(1))))
+    assertEquals(Util.getStreamsAndPartitionsForContainer(0, 1, Set(new 
SystemStreamPartition("system", "stream", new Partition(0)))), Set(new 
SystemStreamPartition("system", "stream", new Partition(0))))
   }
 
   val clock = () => System.currentTimeMillis

Reply via email to