Updated Branches:
  refs/heads/master c12965ddc -> 30a606370

SAMZA-145; fix bug in bootstrap chooser that ignored bootstrap stream 
configuration.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/30a60637
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/30a60637
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/30a60637

Branch: refs/heads/master
Commit: 30a606370a7bdd0db0449b160f7de9d0862c301e
Parents: c12965d
Author: Chris Riccomini <criccomi@criccomi-mn.(none)>
Authored: Wed Feb 12 20:11:01 2014 -0800
Committer: Chris Riccomini <criccomi@criccomi-mn.(none)>
Committed: Wed Feb 12 20:11:01 2014 -0800

----------------------------------------------------------------------
 .../samza/config/DefaultChooserConfig.scala     |  2 +-
 .../system/chooser/TestDefaultChooser.scala     | 28 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/30a60637/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
index 5fb92cf..9351c66 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
@@ -38,7 +38,7 @@ class DefaultChooserConfig(config: Config) extends 
ScalaMapConfig(config) {
   def getBootstrapStreams = config
     .getInputStreams
     .map(systemStream => (systemStream, getOrElse(BOOTSTRAP format 
(systemStream.getSystem, systemStream.getStream), "false").equals("true")))
-    .filter(_._2.equals("true"))
+    .filter(_._2.equals(true))
     .map(_._1)
 
   def getPriorityStreams = config

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/30a60637/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index 12e12c0..7f9e89c 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -25,6 +25,10 @@ import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
+import org.apache.samza.config.MapConfig
+import scala.collection.JavaConversions._
+import org.apache.samza.config.DefaultChooserConfig
+import org.apache.samza.system.SystemStream
 
 class TestDefaultChooser {
   val envelope1 = new IncomingMessageEnvelope(new 
SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
@@ -116,6 +120,30 @@ class TestDefaultChooser {
     assertEquals(envelope3, chooser.choose)
     assertEquals(null, chooser.choose)
   }
+
+  @Test
+  def testBootstrapConfig {
+    import DefaultChooserConfig.Config2DefaultChooser
+    val configMap = Map(
+      "task.inputs" -> "kafka.foo,kafka.bar-baz",
+      "systems.kafka.streams.bar-baz.samza.bootstrap" -> "true")
+    val config = new MapConfig(configMap)
+    val bootstrapStreams = config.getBootstrapStreams
+    assertEquals(1, bootstrapStreams.size)
+    assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bar-baz")))
+  }
+
+  @Test
+  def testPriorityConfig {
+    import DefaultChooserConfig.Config2DefaultChooser
+    val configMap = Map(
+      "task.inputs" -> "kafka.foo,kafka.bar-baz",
+      "systems.kafka.streams.bar-baz.samza.priority" -> "3")
+    val config = new MapConfig(configMap)
+    val priorityStreams = config.getPriorityStreams
+    assertEquals(1, priorityStreams.size)
+    assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
+  }
 }
 
 class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {

Reply via email to