Repository: incubator-samza Updated Branches: refs/heads/master bfd97fce0 -> 464a7e27d
SMAZA-194: createStreamPartitionString should not use characters that overlap with uris Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/464a7e27 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/464a7e27 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/464a7e27 Branch: refs/heads/master Commit: 464a7e27d436b526ad210ec8d0e8e5223ac328f2 Parents: bfd97fc Author: Jakob Glen Homan <[email protected]> Authored: Wed Mar 19 12:17:40 2014 -0700 Committer: Jakob Glen Homan <[email protected]> Committed: Wed Mar 19 12:17:40 2014 -0700 ---------------------------------------------------------------------- .../src/main/scala/org/apache/samza/util/Util.scala | 13 ++++++++----- .../test/scala/org/apache/samza/util/TestUtil.scala | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/464a7e27/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 5b429df..24a954c 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -140,6 +140,9 @@ object Util extends Logging { ssp.filter(_.getPartition.getPartitionId % containerCount == containerId) } + val partitionSeparator = ";" + val topicSeparator = "," + val topicStreamGrouper = "#" /** * Serialize a collection of stream-partitions to a string suitable for passing between processes. * The streams will be grouped by partition. The partition will be separated from the topics by @@ -153,13 +156,13 @@ object Util extends Logging { */ def createStreamPartitionString(sp: Set[SystemStreamPartition]): String = { for ( - ch <- List(':', ',', '/'); + ch <- List(partitionSeparator, topicSeparator, topicStreamGrouper); s <- sp ) { if (s.getStream.contains(ch)) throw new IllegalArgumentException(s + " contains illegal character " + ch) } - sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + ":" + z._2.map(y => y.getSystem + "." + y.getStream).mkString(",")).mkString("/") + sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + partitionSeparator + z._2.map(y => y.getSystem + "." + y.getStream).mkString(topicSeparator)).mkString(topicStreamGrouper) } @@ -174,14 +177,14 @@ object Util extends Logging { if (sp == null || sp.isEmpty) return Set.empty def splitPartitionGroup(pg: String) = { - val split = pg.split(":") // Seems like there should be a more scalar way of doing this + val split = pg.split(partitionSeparator) // Seems like there should be a more scalar way of doing this val part = split(0).toInt - val streams = split(1).split(",").toList + val streams = split(1).split(topicSeparator).toList streams.map(s => new SystemStreamPartition(getSystemStreamFromNames(s), new Partition(part))).toSet } - sp.split("/").map(splitPartitionGroup(_)).toSet.flatten + sp.split(topicStreamGrouper).map(splitPartitionGroup(_)).toSet.flatten } /** http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/464a7e27/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index 60c9615..1bfd63c 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -87,7 +87,7 @@ class TestUtil { @Test def testCreateStreamPartitionStringBlocksDelimeters() { val partOne = new Partition(1) - val toTry = List(':', ',', '/') + val toTry = List(Util.topicSeparator, Util.topicStreamGrouper, Util.partitionSeparator) .map(ch => (ch, Set(new SystemStreamPartition("kafka", "good1", partOne), new SystemStreamPartition("kafka", "bad" + ch, partOne), new SystemStreamPartition("notkafka", "alsogood", partOne))))
