Repository: incubator-gearpump
Updated Branches:
  refs/heads/akka-streams f1bec6709 -> cc0578e5d


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index f8666ba..3f23fa9 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -23,10 +23,12 @@ import java.time.Instant
 import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.CoLocationPartitioner
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, 
ReduceFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
@@ -56,8 +58,8 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
     val graph = Graph.empty[Op, OpEdge]
     val sourceOp = DataSourceOp(new AnySource)
     val groupByOp = GroupByOp(new AnyGroupByFn)
-    val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction)
-    val reduceOp = ChainableOp[Any, Any](anyReduceFunction)
+    val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
+    val reduceOp = ChainableOp[Any, Any](anyReducer)
     val processorOp = new ProcessorOp[AnyTask]
     val sinkOp = DataSinkOp(new AnySink)
     val directEdge = Direct
@@ -92,9 +94,10 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
 object PlannerSpec {
 
   private val anyParallelism = 1
-  private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), 
"flatMap")
-  private val anyReduceFunction = new ReduceFunction[Any](
-    (left: Any, right: Any) => (left, right), "reduce")
+  private val anyFlatMapper = new FlatMapper[Any, Any](
+    FlatMapFunction(Option(_)), "flatMap")
+  private val anyReducer = new Reducer[Any](
+    ReduceFunction((left: Any, right: Any) => (left, right)), "reduce")
 
   class AnyTask(context: TaskContext, config: UserConfig) extends 
Task(context, config)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
index 94feae4..2c03e1c 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -23,9 +23,11 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
 import org.apache.gearpump.streaming.dsl.window.api.CountWindow
 import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
@@ -45,7 +47,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers 
with MockitoSugar {
 
     val first = mock[SingleInputFunction[R, S]]
     val second = mock[SingleInputFunction[S, T]]
-    val andThen = new AndThen(first, second)
+    val andThen = AndThen(first, second)
 
     "chain first and second functions when processing input value" in {
       val input = mock[R]
@@ -77,161 +79,164 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
       andThen.finish().toList shouldBe List(secondResult)
     }
 
-    "clear both states on clearState" in {
-      andThen.clearState()
+    "set up both functions on setup" in {
+      andThen.setup()
 
-      verify(first).clearState()
-      verify(second).clearState()
+      verify(first).setup()
+      verify(second).setup()
     }
 
-    "return AndThen on andThen" in {
-      val third = mock[SingleInputFunction[T, Any]]
-      andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]]
+    "tear down both functions on teardown" in {
+      andThen.teardown()
+
+      verify(first).teardown()
+      verify(second).teardown()
+    }
+
+    "chain multiple single input function" in {
+      val split = new FlatMapper[String, 
String](FlatMapFunction(_.split("\\s")), "split")
+
+      val filter = new FlatMapper[String, String](
+        FlatMapFunction(word => if (word.isEmpty) None else Some(word)), 
"filter")
+
+      val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), 
"map")
+
+      val sum = new Reducer[Int](ReduceFunction({(left, right) => left + 
right}), "sum")
+
+      val all = AndThen(split, AndThen(filter, AndThen(map, sum)))
+
+      assert(all.description == "split.filter.map.sum")
+
+      val data =
+        """
+      five  four three  two    one
+      five  four three  two
+      five  four three
+      five  four
+      five
+        """
+      // force eager evaluation
+      all.process(data).toList
+      val result = all.finish().toList
+      assert(result.nonEmpty)
+      assert(result.last == 15)
     }
   }
 
-  "FlatMapFunction" should {
+  "FlatMapper" should {
 
-    val flatMap = mock[R => TraversableOnce[S]]
-    val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap")
+    val flatMapFunction = mock[FlatMapFunction[R, S]]
+    val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap")
 
     "call flatMap function when processing input value" in {
       val input = mock[R]
-      flatMapFunction.process(input)
-      verify(flatMap).apply(input)
+      flatMapper.process(input)
+      verify(flatMapFunction).apply(input)
     }
 
     "return passed in description" in {
-      flatMapFunction.description shouldBe "flatMap"
+      flatMapper.description shouldBe "flatMap"
     }
 
     "return None on finish" in {
-      flatMapFunction.finish() shouldBe List.empty[S]
+      flatMapper.finish() shouldBe List.empty[S]
     }
 
-    "do nothing on clearState" in {
-      flatMapFunction.clearState()
-      verifyZeroInteractions(flatMap)
+    "set up FlatMapFunction on setup" in {
+      flatMapper.setup()
+
+      verify(flatMapFunction).setup()
     }
 
-    "return AndThen on andThen" in {
-      val other = mock[SingleInputFunction[S, T]]
-      flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]]
+    "tear down FlatMapFunction on teardown" in {
+      flatMapper.teardown()
+
+      verify(flatMapFunction).teardown()
     }
   }
 
   "ReduceFunction" should {
 
-
     "call reduce function when processing input value" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
       val input1 = mock[T]
       val input2 = mock[T]
       val output = mock[T]
 
-      when(reduce.apply(input1, input2)).thenReturn(output, output)
+      when(reduceFunction.apply(input1, input2)).thenReturn(output, output)
 
-      reduceFunction.process(input1) shouldBe List.empty[T]
-      reduceFunction.process(input2) shouldBe List.empty[T]
-      reduceFunction.finish() shouldBe List(output)
+      reducer.process(input1) shouldBe List.empty[T]
+      reducer.process(input2) shouldBe List.empty[T]
+      reducer.finish() shouldBe List(output)
 
-      reduceFunction.clearState()
-      reduceFunction.process(input1) shouldBe List.empty[T]
-      reduceFunction.clearState()
-      reduceFunction.process(input2) shouldBe List.empty[T]
-      reduceFunction.finish() shouldBe List(input2)
+      reducer.teardown()
+      reducer.process(input1) shouldBe List.empty[T]
+      reducer.teardown()
+      reducer.process(input2) shouldBe List.empty[T]
+      reducer.finish() shouldBe List(input2)
     }
 
     "return passed in description" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.description shouldBe "reduce"
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.description shouldBe "reduce"
     }
 
     "return None on finish" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.finish() shouldBe List.empty[T]
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.finish() shouldBe List.empty[T]
     }
 
-    "do nothing on clearState" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.clearState()
-      verifyZeroInteractions(reduce)
+    "set up reduce function on setup" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.setup()
+
+      verify(reduceFunction).setup()
     }
 
-    "return AndThen on andThen" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      val other = mock[SingleInputFunction[T, Any]]
-      reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]]
+    "tear down reduce function on teardown" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.teardown()
+
+      verify(reduceFunction).teardown()
     }
   }
 
-  "EmitFunction" should {
+  "Emit" should {
 
-    val emit = mock[T => Unit]
-    val emitFunction = new EmitFunction[T](emit)
+    val emitFunction = mock[T => Unit]
+    val emit = new Emit[T](emitFunction)
 
     "emit input value when processing input value" in {
       val input = mock[T]
 
-      emitFunction.process(input) shouldBe List.empty[Unit]
+      emit.process(input) shouldBe List.empty[Unit]
 
-      verify(emit).apply(input)
+      verify(emitFunction).apply(input)
     }
 
     "return empty description" in {
-      emitFunction.description shouldBe ""
+      emit.description shouldBe ""
     }
 
     "return None on finish" in {
-      emitFunction.finish() shouldBe List.empty[Unit]
+      emit.finish() shouldBe List.empty[Unit]
     }
 
-    "do nothing on clearState" in {
-      emitFunction.clearState()
-      verifyZeroInteractions(emit)
-    }
+    "do nothing on setup" in {
+      emit.setup()
 
-    "throw exception on andThen" in {
-      val other = mock[SingleInputFunction[Unit, Any]]
-      intercept[UnsupportedOperationException] {
-        emitFunction.andThen(other)
-      }
+      verifyZeroInteractions(emitFunction)
     }
-  }
 
-  "andThen" should {
-    "chain multiple single input function" in {
-      val split = new FlatMapFunction[String, String](line => 
line.split("\\s"), "split")
-
-      val filter = new FlatMapFunction[String, String](word =>
-        if (word.isEmpty) None else Some(word), "filter")
-
-      val map = new FlatMapFunction[String, Int](word => Some(1), "map")
-
-      val sum = new ReduceFunction[Int]({ (left, right) => left + right }, 
"sum")
-
-      val all = split.andThen(filter).andThen(map).andThen(sum)
-
-      assert(all.description == "split.filter.map.sum")
+    "do nothing on teardown" in {
+      emit.teardown()
 
-      val data =
-        """
-      five  four three  two    one
-      five  four three  two
-      five  four three
-      five  four
-      five
-        """
-      // force eager evaluation
-      all.process(data).toList
-      val result = all.finish().toList
-      assert(result.nonEmpty)
-      assert(result.last == 15)
+      verifyZeroInteractions(emitFunction)
     }
   }
 
@@ -241,7 +246,7 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
       val taskContext = MockUtil.mockTaskContext
       implicit val actorSystem = MockUtil.system
 
-      val data = "one two three".split("\\s")
+      val data = "one two three".split("\\s+")
       val dataSource = new CollectionDataSource[String](data)
       val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, 
dataSource)
 
@@ -257,7 +262,8 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
 
       // Source with transformer
       val anotherTaskContext = MockUtil.mockTaskContext
-      val double = new FlatMapFunction[String, String](word => List(word, 
word), "double")
+      val double = new FlatMapper[String, String](FlatMapFunction(
+        word => List(word, word)), "double")
       val another = new DataSourceTask(anotherTaskContext,
         conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
       another.onStart(Instant.EPOCH)
@@ -275,9 +281,8 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
 
       val data = "1 2  2  3 3  3"
 
-      val concat = new ReduceFunction[String]({ (left, right) =>
-        left + right
-      }, "concat")
+      val concat = new Reducer[String](ReduceFunction({ (left, right) =>
+        left + right}), "concat")
 
       implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
       val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](
@@ -311,7 +316,8 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
       // Source with transformer
       val taskContext = MockUtil.mockTaskContext
       val conf = UserConfig.empty
-      val double = new FlatMapFunction[String, String](word => List(word, 
word), "double")
+      val double = new FlatMapper[String, String](FlatMapFunction(
+        word => List(word, word)), "double")
       val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
       task.onStart(Instant.EPOCH)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
new file mode 100644
index 0000000..5b90a3e
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.dsl.scalaapi
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.mockito.Mockito.when
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
+
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "be able to generate multiple new streams" in {
+    val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
+
+    val dsl = StreamApp("dsl", context)
+    dsl.source(List("A"), 2, "A") shouldBe a [scalaapi.Stream[_]]
+    dsl.source(List("B"), 3, "B") shouldBe a [scalaapi.Stream[_]]
+
+    val application = dsl.plan()
+    application shouldBe a [StreamApplication]
+    application.name shouldBe "dsl"
+    val dag = application.userConfig
+      .getValue[Graph[ProcessorDescription, 
PartitionerDescription]](StreamApplication.DAG).get
+    dag.vertices.size shouldBe 2
+    dag.vertices.foreach { processor =>
+      processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
+      if (processor.description == "A") {
+        processor.parallelism shouldBe 2
+      } else if (processor.description == "B") {
+        processor.parallelism shouldBe 3
+      } else {
+        fail(s"undefined source ${processor.description}")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
new file mode 100644
index 0000000..62a3bcb
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import akka.actor._
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.mockito.Mockito.when
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.{Either, Left, Right}
+
+class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
+
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "translate the DSL to a DAG" in {
+    val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
+
+    val dsl = StreamApp("dsl", context)
+
+    val data =
+      """
+        five  four three  two    one
+        five  four three  two
+        five  four three
+        five  four
+        five
+      """
+    val stream = dsl.source(data.lines.toList, 1, "").
+      flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
+      map(word => (word, 1)).
+      groupBy(_._1, parallelism = 2).
+      reduce((left, right) => (left._1, left._2 + right._2)).
+      map[Either[(String, Int), String]]({t: (String, Int) => Left(t)})
+
+    val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), 
String]](
+      {s: String => Right(s)})
+    stream.merge(query).process[(String, Int)](classOf[Join], 1)
+
+    val app: StreamApplication = dsl.plan()
+    val dag = app.userConfig
+      .getValue[Graph[ProcessorDescription, 
PartitionerDescription]](StreamApplication.DAG).get
+
+    val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, 
node2) =>
+      edge.partitionerFactory.partitioner.getClass.getName
+    }
+    val expectedDagTopology = getExpectedDagTopology
+
+    dagTopology.vertices.toSet should contain theSameElementsAs 
expectedDagTopology.vertices.toSet
+    dagTopology.edges.toSet should contain theSameElementsAs 
expectedDagTopology.edges.toSet
+  }
+
+  private def getExpectedDagTopology: Graph[String, String] = {
+    val source = classOf[DataSourceTask[_, _]].getName
+    val group = classOf[CountTriggerTask[_, _]].getName
+    val merge = classOf[TransformTask[_, _]].getName
+    val join = classOf[Join].getName
+
+    val hash = classOf[HashPartitioner].getName
+    val groupBy = classOf[GroupByPartitioner[_, _]].getName
+    val colocation = classOf[CoLocationPartitioner].getName
+
+    val expectedDagTopology = Graph(
+      source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join,
+      source ~ hash ~> merge
+    )
+    expectedDagTopology
+  }
+}
+
+object StreamSpec {
+
+  class Join(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
+
+    var query: String = _
+
+    override def onNext(msg: Message): Unit = {
+      msg.msg match {
+        case Left(wordCount: (String @unchecked, Int @unchecked)) =>
+          if (query != null && wordCount._1 == query) {
+            taskContext.output(new Message(wordCount))
+          }
+
+        case Right(query: String) =>
+          this.query = query
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
new file mode 100644
index 0000000..b6e7342
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.mockito.Mockito.{verify, when}
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
+
+  property("TransformTask.onStart should call SingleInputFunction.setup") {
+    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { 
(startTime: Instant) =>
+      val taskContext = MockUtil.mockTaskContext
+      implicit val system = MockUtil.system
+      val config = UserConfig.empty
+      val operator = mock[SingleInputFunction[Any, Any]]
+      val sourceTask = new TransformTask[Any, Any](Some(operator), 
taskContext, config)
+
+      sourceTask.onStart(startTime)
+
+      verify(operator).setup()
+    }
+  }
+
+  property("TransformTask.onNext should call SingleInputFunction.process") {
+    forAll(Gen.alphaStr) { (str: String) =>
+      val taskContext = MockUtil.mockTaskContext
+      implicit val system = MockUtil.system
+      val config = UserConfig.empty
+      val operator = mock[SingleInputFunction[Any, Any]]
+      val task = new TransformTask[Any, Any](Some(operator), taskContext, 
config)
+      val msg = Message(str)
+      when(operator.process(str)).thenReturn(Some(str))
+
+      task.onNext(msg)
+
+      verify(taskContext).output(msg)
+    }
+  }
+
+  property("DataSourceTask.onStop should call SingleInputFunction.setup") {
+    val taskContext = MockUtil.mockTaskContext
+    implicit val system = MockUtil.system
+    val config = UserConfig.empty
+    val operator = mock[SingleInputFunction[Any, Any]]
+    val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
+
+    task.onStop()
+
+    verify(operator).teardown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala
new file mode 100644
index 0000000..277a31c
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+import org.scalatest.{FlatSpec, Matchers}
+
+class PartitionerSpec extends FlatSpec with Matchers {
+  val NUM = 10
+
+  "HashPartitioner" should "hash same key to same slots" in {
+    val partitioner = new HashPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition == partitioner.getPartition(msg, NUM), "multiple run 
should return" +
+      "consistent result")
+  }
+
+  "ShufflePartitioner" should "hash same key randomly" in {
+    val partitioner = new ShufflePartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition != partitioner.getPartition(msg, NUM), "multiple run 
should return" +
+      "consistent result")
+  }
+
+  "BroadcastPartitioner" should "return all partitions" in {
+    val partitioner = new BroadcastPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+    val partitions = partitioner.getPartitions(msg, NUM)
+
+    partitions should contain theSameElementsAs 0.until(NUM)
+  }
+
+
+  "ShuffleGroupingPartitioner" should "hash same key randomly" in {
+    val partitioner = new ShuffleGroupingPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition != partitioner.getPartition(msg, NUM), "multiple run 
should return" +
+      "consistent result")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index c786047..4e95bdd 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,6 +23,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -38,11 +39,13 @@ class DataSourceTaskSpec extends PropSpec with 
PropertyChecks with Matchers with
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-
-      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, None)
+      val operator = mock[SingleInputFunction[Any, Any]]
+      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, Some(operator))
 
       sourceTask.onStart(startTime)
+
       verify(dataSource).open(taskContext, startTime)
+      verify(operator).setup()
     }
   }
 
@@ -69,9 +72,12 @@ class DataSourceTaskSpec extends PropSpec with 
PropertyChecks with Matchers with
     val dataSource = mock[DataSource]
     val config = UserConfig.empty
       .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-    val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, None)
+    val operator = mock[SingleInputFunction[Any, Any]]
+    val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, Some(operator))
 
     sourceTask.onStop()
+
     verify(dataSource).close()
+    verify(operator).teardown()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
index cfe47eb..bd3ddb8 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.task
 
 import org.scalatest.{FlatSpec, Matchers}
 
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.task.SubscriberSpec.TestTask
 import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
 import org.apache.gearpump.util.Graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 258a5ff..d128ace 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -26,7 +26,7 @@ import org.scalatest.{FlatSpec, Matchers}
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
 import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
index 48901d2..8deee78 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{MasterHarness, TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.serializer.{FastKryoSerializer, 
SerializationFramework}
 import org.apache.gearpump.streaming.AppMasterToExecutor.{ChangeTask, 
MsgLostException, StartTask, TaskChanged, TaskRegistered}
 import org.apache.gearpump.streaming.task.TaskActorSpec.TestTask

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/version.sbt
----------------------------------------------------------------------
diff --git a/version.sbt b/version.sbt
index 6f96fe0..f817483 100644
--- a/version.sbt
+++ b/version.sbt
@@ -16,4 +16,4 @@
  * limitations under the License.
  */
 
-version in ThisBuild := "0.8.2-SNAPSHOT"
+version in ThisBuild := "0.8.3-SNAPSHOT"

Reply via email to