Updated Branches: refs/heads/master c12965ddc -> 30a606370
SAMZA-145; fix bug in bootstrap chooser that ignored bootstrap stream configuration. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/30a60637 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/30a60637 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/30a60637 Branch: refs/heads/master Commit: 30a606370a7bdd0db0449b160f7de9d0862c301e Parents: c12965d Author: Chris Riccomini <criccomi@criccomi-mn.(none)> Authored: Wed Feb 12 20:11:01 2014 -0800 Committer: Chris Riccomini <criccomi@criccomi-mn.(none)> Committed: Wed Feb 12 20:11:01 2014 -0800 ---------------------------------------------------------------------- .../samza/config/DefaultChooserConfig.scala | 2 +- .../system/chooser/TestDefaultChooser.scala | 28 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/30a60637/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala index 5fb92cf..9351c66 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala @@ -38,7 +38,7 @@ class DefaultChooserConfig(config: Config) extends ScalaMapConfig(config) { def getBootstrapStreams = config .getInputStreams .map(systemStream => (systemStream, getOrElse(BOOTSTRAP format (systemStream.getSystem, systemStream.getStream), "false").equals("true"))) - .filter(_._2.equals("true")) + .filter(_._2.equals(true)) .map(_._1) def getPriorityStreams = config http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/30a60637/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala index 12e12c0..7f9e89c 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala @@ -25,6 +25,10 @@ import org.apache.samza.util.BlockingEnvelopeMap import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition +import org.apache.samza.config.MapConfig +import scala.collection.JavaConversions._ +import org.apache.samza.config.DefaultChooserConfig +import org.apache.samza.system.SystemStream class TestDefaultChooser { val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1); @@ -116,6 +120,30 @@ class TestDefaultChooser { assertEquals(envelope3, chooser.choose) assertEquals(null, chooser.choose) } + + @Test + def testBootstrapConfig { + import DefaultChooserConfig.Config2DefaultChooser + val configMap = Map( + "task.inputs" -> "kafka.foo,kafka.bar-baz", + "systems.kafka.streams.bar-baz.samza.bootstrap" -> "true") + val config = new MapConfig(configMap) + val bootstrapStreams = config.getBootstrapStreams + assertEquals(1, bootstrapStreams.size) + assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bar-baz"))) + } + + @Test + def testPriorityConfig { + import DefaultChooserConfig.Config2DefaultChooser + val configMap = Map( + "task.inputs" -> "kafka.foo,kafka.bar-baz", + "systems.kafka.streams.bar-baz.samza.priority" -> "3") + val config = new MapConfig(configMap) + val priorityStreams = config.getPriorityStreams + assertEquals(1, priorityStreams.size) + assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz"))) + } } class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {
