Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 5c4d60c5b -> 66017ab7b


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index fb2d898..535497c 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import 
org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, 
SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 /**
@@ -57,15 +57,10 @@ class DataSourceTask[IN, OUT] private[source](
   private val processMessage: Message => Unit =
     operator match {
       case Some(op) =>
-        op match {
-          case bad: DummyInputFunction[IN] =>
-            (message: Message) => context.output(message)
-          case _ =>
-            (message: Message) => {
-              op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
-                context.output(Message(m, message.timestamp))
-              }
-            }
+        (message: Message) => {
+          op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
+            context.output(Message(m, message.timestamp))
+          }
         }
       case None =>
         (message: Message) => context.output(message)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index c0b6a29..9a52cc6 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -23,7 +23,7 @@ import java.util
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
-import org.apache.gearpump.streaming.source.{Watermark, DataSourceTask}
+import org.apache.gearpump.streaming.source.Watermark
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import 
org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
@@ -308,9 +308,9 @@ class TaskActor(
 
   private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
     if (upstreamClock > this.upstreamMinClock) {
+      this.upstreamMinClock = upstreamClock
       task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
     }
-    this.upstreamMinClock = upstreamClock
 
     val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
       val subMin = sub._2.minClock

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
index e919a34..e0407ec 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -21,7 +21,10 @@ package org.apache.gearpump.streaming.dsl
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.TestUtil
 import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.util.Graph
 import org.mockito.Mockito.when
 import org.scalatest._
 import org.scalatest.mock.MockitoSugar
@@ -30,7 +33,7 @@ import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
 
-  implicit var system: ActorSystem = null
+  implicit var system: ActorSystem = _
 
   override def beforeAll(): Unit = {
     system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
@@ -45,49 +48,25 @@ class StreamAppSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with M
     val context: ClientContext = mock[ClientContext]
     when(context.system).thenReturn(system)
 
-    val app = StreamApp("dsl", context)
-    app.source(List("A"), 1, "")
-    app.source(List("B"), 1, "")
+    val dsl = StreamApp("dsl", context)
+    dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
+    dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
 
-    assert(app.graph.vertices.size == 2)
-  }
-
-  it should "plan the dsl to Processsor(TaskDescription) DAG" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val app = StreamApp("dsl", context)
-    val parallism = 3
-    app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ 
+ _)
-    val task = app.plan.dag.vertices.iterator.next()
-    assert(task.taskClass == classOf[DataSourceTask[_, _]].getName)
-    assert(task.parallelism == parallism)
-  }
-
-  it should "produce 3 messages" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-    val app = StreamApp("dsl", context)
-    val list = List[String](
-      "0",
-      "1",
-      "2"
-    )
-    val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ 
+ _)
-    val task = app.plan.dag.vertices.iterator.next()
-      /*
-      val task = app.plan.dag.vertices.iterator.map(desc => {
-        LOG.info(s"${desc.taskClass}")
-      })
-      val sum = producer.flatMap(msg => {
-        LOG.info("in flatMap")
-        assert(msg.msg.isInstanceOf[String])
-        val num = msg.msg.asInstanceOf[String].toInt
-        Array(num)
-      }).reduce(_+_)
-      val task = app.plan.dag.vertices.iterator.map(desc => {
-        LOG.info(s"${desc.taskClass}")
-      })
-     */
+    val application = dsl.plan()
+    application shouldBe a [StreamApplication]
+    application.name shouldBe "dsl"
+    val dag = application.userConfig
+      .getValue[Graph[ProcessorDescription, 
PartitionerDescription]](StreamApplication.DAG).get
+    dag.vertices.size shouldBe 2
+    dag.vertices.foreach { processor =>
+      processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
+      if (processor.description == "A") {
+        processor.parallelism shouldBe 2
+      } else if (processor.description == "B") {
+        processor.parallelism shouldBe 3
+      } else {
+        fail(s"undefined source ${processor.description}")
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index 816feef..fdc721b 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -22,10 +22,11 @@ import akka.actor._
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
+import org.apache.gearpump.partitioner.{CoLocationPartitioner, 
HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.streaming.dsl.StreamSpec.Join
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -40,7 +41,6 @@ import scala.util.{Either, Left, Right}
 
 class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
 
-
   implicit var system: ActorSystem = _
 
   override def beforeAll(): Unit = {
@@ -56,7 +56,7 @@ class StreamSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Mock
     val context: ClientContext = mock[ClientContext]
     when(context.system).thenReturn(system)
 
-    val app = StreamApp("dsl", context)
+    val dsl = StreamApp("dsl", context)
 
     val data =
       """
@@ -66,30 +66,32 @@ class StreamSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Mock
         five  four
         five
       """
-    val stream = app.source(data.lines.toList, 1, "").
+    val stream = dsl.source(data.lines.toList, 1, "").
       flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
       map(word => (word, 1)).
       groupBy(_._1, parallelism = 2).
       reduce((left, right) => (left._1, left._2 + right._2)).
       map[Either[(String, Int), String]](Left(_))
 
-    val query = app.source(List("two"), 1, "").map[Either[(String, Int), 
String]](Right(_))
+    val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), 
String]](Right(_))
     stream.merge(query).process[(String, Int)](classOf[Join], 1)
 
-    val appDescription = app.plan()
+    val app: StreamApplication = dsl.plan()
+    val dag = app.userConfig
+      .getValue[Graph[ProcessorDescription, 
PartitionerDescription]](StreamApplication.DAG).get
 
-    val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { 
(node1, edge, node2) =>
+    val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, 
node2) =>
       edge.partitionerFactory.partitioner.getClass.getName
     }
     val expectedDagTopology = getExpectedDagTopology
 
-    
assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet))
-    assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet))
+    dagTopology.vertices.toSet should contain theSameElementsAs 
expectedDagTopology.vertices.toSet
+    dagTopology.edges.toSet should contain theSameElementsAs 
expectedDagTopology.edges.toSet
   }
 
   private def getExpectedDagTopology: Graph[String, String] = {
     val source = classOf[DataSourceTask[_, _]].getName
-    val group = classOf[GroupByTask[_, _, _]].getName
+    val group = classOf[CountTriggerTask[_, _]].getName
     val merge = classOf[TransformTask[_, _]].getName
     val join = classOf[Join].getName
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
index fcc646d..f49eb04 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
@@ -18,24 +18,33 @@
 
 package org.apache.gearpump.streaming.dsl.partitioner
 
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import java.time.Duration
 
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.apache.gearpump.Message
 import 
org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
+import org.apache.gearpump.streaming.dsl.window.api.{FixedWindow, GroupByFn}
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, 
GroupAlsoByWindow}
 
 class GroupByPartitionerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
-  it should "use the outpout of groupBy function to do partition" in {
+
+  it should "group by message payload and window" in {
     val mark = People("Mark", "male")
     val tom = People("Tom", "male")
     val michelle = People("Michelle", "female")
 
     val partitionNum = 10
-    val groupBy = new GroupByPartitioner[People, String](_.gender)
-    assert(groupBy.getPartition(Message(mark), partitionNum)
-      == groupBy.getPartition(Message(tom), partitionNum))
+    val groupByFn: GroupByFn[People, (String, List[Bucket])] =
+      GroupAlsoByWindow[People, String](_.gender, 
FixedWindow.apply(Duration.ofMillis(5)))
+    val groupBy = new GroupByPartitioner[People, (String, 
List[Bucket])](groupByFn)
+    groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
+      groupBy.getPartition(Message(tom, 2L), partitionNum)
+
+    groupBy.getPartition(Message(mark, 1L), partitionNum) should not be
+      groupBy.getPartition(Message(tom, 6L), partitionNum)
 
-    assert(groupBy.getPartition(Message(mark), partitionNum)
-      != groupBy.getPartition(Message(michelle), partitionNum))
+    groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
+      groupBy.getPartition(Message(michelle, 3L), partitionNum)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
new file mode 100644
index 0000000..bf52abc
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, 
AnyTask}
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
+
+  private val unchainableOps: List[Op] = List(
+    mock[DataSourceOp],
+    mock[DataSinkOp],
+    mock[GroupByOp[Any, Any]],
+    mock[MergeOp],
+    mock[ProcessorOp[AnyTask]])
+
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  "DataSourceOp" should {
+
+    "chain ChainableOp" in {
+      val dataSource = new AnySource
+      val dataSourceOp = DataSourceOp(dataSource)
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      val fn = mock[SingleInputFunction[Any, Any]]
+
+      val chainedOp = dataSourceOp.chain(chainableOp)
+
+      chainedOp shouldBe a[DataSourceOp]
+      verify(chainableOp).fn
+
+      unchainableOps.foreach { op =>
+        intercept[OpChainException] {
+          dataSourceOp.chain(op)
+        }
+      }
+    }
+
+    "get Processor of DataSource" in {
+      val dataSource = new AnySource
+      val dataSourceOp = DataSourceOp(dataSource)
+      val processor = dataSourceOp.getProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe dataSourceOp.parallelism
+      processor.description shouldBe dataSourceOp.description
+    }
+  }
+
+  "DataSinkOp" should {
+
+    "not chain any Op" in {
+      val dataSink = new AnySink
+      val dataSinkOp = DataSinkOp(dataSink)
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      val ops = chainableOp +: unchainableOps
+      ops.foreach { op =>
+        intercept[OpChainException] {
+          dataSinkOp.chain(op)
+        }
+      }
+    }
+
+    "get Processor of DataSink" in {
+      val dataSink = new AnySink
+      val dataSinkOp = DataSinkOp(dataSink)
+      val processor = dataSinkOp.getProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe dataSinkOp.parallelism
+      processor.description shouldBe dataSinkOp.description
+    }
+  }
+
+  "ProcessorOp" should {
+
+    "not chain any Op" in {
+      val processorOp = new ProcessorOp[AnyTask]
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      val ops = chainableOp +: unchainableOps
+      ops.foreach { op =>
+        intercept[OpChainException] {
+          processorOp.chain(op)
+        }
+      }
+    }
+
+    "get Processor" in {
+      val processorOp = new ProcessorOp[AnyTask]
+      val processor = processorOp.getProcessor
+      processor shouldBe a [DefaultProcessor[_]]
+      processor.parallelism shouldBe processorOp.parallelism
+      processor.description shouldBe processorOp.description
+    }
+  }
+
+  "ChainableOp" should {
+
+    "chain ChainableOp" in {
+      val fn1 = mock[SingleInputFunction[Any, Any]]
+      val chainableOp1 = ChainableOp[Any, Any](fn1)
+
+      val fn2 = mock[SingleInputFunction[Any, Any]]
+      val chainableOp2 = ChainableOp[Any, Any](fn2)
+
+      val chainedOp = chainableOp1.chain(chainableOp2)
+
+      verify(fn1).andThen(fn2)
+      chainedOp shouldBe a[ChainableOp[_, _]]
+
+      unchainableOps.foreach { op =>
+        intercept[OpChainException] {
+          chainableOp1.chain(op)
+        }
+      }
+    }
+
+    "throw exception on getProcessor" in {
+      val fn1 = mock[SingleInputFunction[Any, Any]]
+      val chainableOp1 = ChainableOp[Any, Any](fn1)
+      intercept[UnsupportedOperationException] {
+        chainableOp1.getProcessor
+      }
+    }
+  }
+
+  "GroupByOp" should {
+
+    "chain ChainableOp" in {
+      val groupByFn = mock[GroupByFn[Any, Any]]
+      val groupByOp = GroupByOp[Any, Any](groupByFn)
+      val fn = mock[SingleInputFunction[Any, Any]]
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      when(chainableOp.fn).thenReturn(fn)
+
+      val chainedOp = groupByOp.chain(chainableOp)
+      chainedOp shouldBe a[GroupByOp[_, _]]
+
+      unchainableOps.foreach { op =>
+        intercept[OpChainException] {
+          groupByOp.chain(op)
+        }
+      }
+    }
+
+    "delegate to groupByFn on getProcessor" in {
+      val groupByFn = mock[GroupByFn[Any, Any]]
+      val groupByOp = GroupByOp[Any, Any](groupByFn)
+
+      groupByOp.getProcessor
+      verify(groupByFn).getProcessor(anyInt, anyString, 
any[UserConfig])(any[ActorSystem])
+    }
+  }
+
+  "MergeOp" should {
+
+    val mergeOp = MergeOp("merge")
+
+    "chain ChainableOp" in {
+      val fn = mock[SingleInputFunction[Any, Any]]
+      val chainableOp = mock[ChainableOp[Any, Any]]
+      when(chainableOp.fn).thenReturn(fn)
+
+      val chainedOp = mergeOp.chain(chainableOp)
+      chainedOp shouldBe a [MergeOp]
+
+      unchainableOps.foreach { op =>
+        intercept[OpChainException] {
+          mergeOp.chain(op)
+        }
+      }
+    }
+
+    "get Processor" in {
+      val processor = mergeOp.getProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe 1
+    }
+  }
+}
+
+object OpSpec {
+  class AnyTask(context: TaskContext, config: UserConfig) extends 
Task(context, config)
+
+  class AnySource extends DataSource {
+
+    override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+    override def read(): Message = Message("any")
+
+    override def close(): Unit = {}
+
+    override def getWatermark: Instant = Instant.now()
+  }
+
+  class AnySink extends DataSink {
+
+    override def open(context: TaskContext): Unit = {}
+
+    override def write(message: Message): Unit = {}
+
+    override def close(): Unit = {}
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
deleted file mode 100644
index 2112fd0..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.plan
-
-import java.time.Instant
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import akka.actor.ActorSystem
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest._
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.source.DataSourceTask
-
-class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-
-  "andThen" should "chain multiple single input function" in {
-    val dummy = new DummyInputFunction[String]
-    val split = new FlatMapFunction[String, String](line => line.split("\\s"), 
"split")
-
-    val filter = new FlatMapFunction[String, String](word =>
-      if (word.isEmpty) None else Some(word), "filter")
-
-    val map = new FlatMapFunction[String, Int](word => Some(1), "map")
-
-    val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
-
-    val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum)
-
-    assert(all.description == "split.filter.map.sum")
-
-    val data =
-      """
-      five  four three  two    one
-      five  four three  two
-      five  four three
-      five  four
-      five
-      """
-    val count = all.process(data).toList.last
-    assert(count == 15)
-  }
-
-  "Source" should "iterate over input source and apply attached operator" in {
-
-    val taskContext = MockUtil.mockTaskContext
-    implicit val actorSystem = MockUtil.system
-
-    val data = "one two three".split("\\s")
-    val dataSource = new CollectionDataSource[String](data)
-    val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, 
dataSource)
-
-    // Source with no transformer
-    val source = new DataSourceTask[String, String](
-      taskContext, conf)
-    source.onStart(Instant.EPOCH)
-    source.onNext(Message("next"))
-    data.foreach { s =>
-      verify(taskContext, times(1)).output(Message(s))
-    }
-
-    // Source with transformer
-    val anotherTaskContext = MockUtil.mockTaskContext
-    val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
-    val another = new DataSourceTask(anotherTaskContext,
-      conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
-    another.onStart(Instant.EPOCH)
-    another.onNext(Message("next"))
-    data.foreach { s =>
-      verify(anotherTaskContext, times(2)).output(Message(s))
-    }
-  }
-
-  "GroupByTask" should "group input by groupBy Function and " +
-    "apply attached operator for each group" in {
-
-    val data = "1 2  2  3 3  3"
-
-    val concat = new ReduceFunction[String]({ (left, right) =>
-      left + right
-    }, "concat")
-
-    implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-    val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](
-    GEARPUMP_STREAMING_OPERATOR, concat)
-
-    val taskContext = MockUtil.mockTaskContext
-
-    val task = new GroupByTask[String, String, String](input => input, 
taskContext, config)
-    task.onStart(Instant.EPOCH)
-
-    val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
-
-    data.split("\\s+").foreach { word =>
-      task.onNext(Message(word))
-    }
-    verify(taskContext, times(6)).output(peopleCaptor.capture())
-
-    import scala.collection.JavaConverters._
-
-    val values = peopleCaptor.getAllValues.asScala.map(input => 
input.msg.asInstanceOf[String])
-    assert(values.mkString(",") == "1,2,22,3,33,333")
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  "MergeTask" should "accept two stream and apply the attached operator" in {
-
-    // Source with transformer
-    val taskContext = MockUtil.mockTaskContext
-    val conf = UserConfig.empty
-    val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
-    val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
-    task.onStart(Instant.EPOCH)
-
-    val data = "1 2  2  3 3  3".split("\\s+")
-
-    data.foreach { input =>
-      task.onNext(Message(input))
-    }
-
-    verify(taskContext, times(data.length * 2)).output(anyObject())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
new file mode 100644
index 0000000..f8666ba
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.partitioner.CoLocationPartitioner
+import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, 
ReduceFunction}
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.{MockUtil, Processor}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
+
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  "Planner" should "chain operations" in {
+    val graph = Graph.empty[Op, OpEdge]
+    val sourceOp = DataSourceOp(new AnySource)
+    val groupByOp = GroupByOp(new AnyGroupByFn)
+    val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction)
+    val reduceOp = ChainableOp[Any, Any](anyReduceFunction)
+    val processorOp = new ProcessorOp[AnyTask]
+    val sinkOp = DataSinkOp(new AnySink)
+    val directEdge = Direct
+    val shuffleEdge = Shuffle
+
+    graph.addVertex(sourceOp)
+    graph.addVertex(groupByOp)
+    graph.addEdge(sourceOp, shuffleEdge, groupByOp)
+    graph.addVertex(flatMapOp)
+    graph.addEdge(groupByOp, directEdge, flatMapOp)
+    graph.addVertex(reduceOp)
+    graph.addEdge(flatMapOp, directEdge, reduceOp)
+    graph.addVertex(processorOp)
+    graph.addEdge(reduceOp, directEdge, processorOp)
+    graph.addVertex(sinkOp)
+    graph.addEdge(processorOp, directEdge, sinkOp)
+
+    implicit val system = MockUtil.system
+
+    val planner = new Planner
+    val plan = planner.plan(graph)
+      .mapVertex(_.description)
+
+    plan.vertices.toSet should contain theSameElementsAs
+      Set("source", "groupBy", "processor", "sink")
+    plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe 
a[GroupByPartitioner[_, _]]
+    plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe 
a[CoLocationPartitioner]
+    plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe 
a[CoLocationPartitioner]
+  }
+}
+
+object PlannerSpec {
+
+  private val anyParallelism = 1
+  private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), 
"flatMap")
+  private val anyReduceFunction = new ReduceFunction[Any](
+    (left: Any, right: Any) => (left, right), "reduce")
+
+  class AnyTask(context: TaskContext, config: UserConfig) extends 
Task(context, config)
+
+  class AnySource extends DataSource {
+
+    override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+    override def read(): Message = Message("any")
+
+    override def close(): Unit = {}
+
+    override def getWatermark: Instant = Instant.now()
+  }
+
+  class AnySink extends DataSink {
+
+    override def open(context: TaskContext): Unit = {}
+
+    override def write(message: Message): Unit = {}
+
+    override def close(): Unit = {}
+  }
+
+  class AnyGroupByFn extends GroupByFn[Any, Any] {
+
+    override def groupBy(message: Message): Any = message.msg
+
+    override def getProcessor(
+        parallelism: Int,
+        description: String,
+        userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: 
Task] = {
+      Processor[AnyTask](anyParallelism, description)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
new file mode 100644
index 0000000..94feae4
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar 
{
+  import 
org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._
+
+  "AndThen" should {
+
+    val first = mock[SingleInputFunction[R, S]]
+    val second = mock[SingleInputFunction[S, T]]
+    val andThen = new AndThen(first, second)
+
+    "chain first and second functions when processing input value" in {
+      val input = mock[R]
+      val firstOutput = mock[S]
+      val secondOutput = mock[T]
+      when(first.process(input)).thenReturn(Some(firstOutput))
+      when(second.process(firstOutput)).thenReturn(Some(secondOutput))
+
+      andThen.process(input).toList shouldBe List(secondOutput)
+    }
+
+    "return chained description" in {
+      when(first.description).thenReturn("first")
+      when(second.description).thenReturn("second")
+      andThen.description shouldBe "first.second"
+    }
+
+    "return either first result or second on finish" in {
+      val firstResult = mock[S]
+      val processedFirst = mock[T]
+      val secondResult = mock[T]
+
+      when(first.finish()).thenReturn(Some(firstResult))
+      when(second.process(firstResult)).thenReturn(Some(processedFirst))
+      andThen.finish().toList shouldBe List(processedFirst)
+
+      when(first.finish()).thenReturn(None)
+      when(second.finish()).thenReturn(Some(secondResult))
+      andThen.finish().toList shouldBe List(secondResult)
+    }
+
+    "clear both states on clearState" in {
+      andThen.clearState()
+
+      verify(first).clearState()
+      verify(second).clearState()
+    }
+
+    "return AndThen on andThen" in {
+      val third = mock[SingleInputFunction[T, Any]]
+      andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]]
+    }
+  }
+
+  "FlatMapFunction" should {
+
+    val flatMap = mock[R => TraversableOnce[S]]
+    val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap")
+
+    "call flatMap function when processing input value" in {
+      val input = mock[R]
+      flatMapFunction.process(input)
+      verify(flatMap).apply(input)
+    }
+
+    "return passed in description" in {
+      flatMapFunction.description shouldBe "flatMap"
+    }
+
+    "return None on finish" in {
+      flatMapFunction.finish() shouldBe List.empty[S]
+    }
+
+    "do nothing on clearState" in {
+      flatMapFunction.clearState()
+      verifyZeroInteractions(flatMap)
+    }
+
+    "return AndThen on andThen" in {
+      val other = mock[SingleInputFunction[S, T]]
+      flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]]
+    }
+  }
+
+  "ReduceFunction" should {
+
+
+    "call reduce function when processing input value" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      val input1 = mock[T]
+      val input2 = mock[T]
+      val output = mock[T]
+
+      when(reduce.apply(input1, input2)).thenReturn(output, output)
+
+      reduceFunction.process(input1) shouldBe List.empty[T]
+      reduceFunction.process(input2) shouldBe List.empty[T]
+      reduceFunction.finish() shouldBe List(output)
+
+      reduceFunction.clearState()
+      reduceFunction.process(input1) shouldBe List.empty[T]
+      reduceFunction.clearState()
+      reduceFunction.process(input2) shouldBe List.empty[T]
+      reduceFunction.finish() shouldBe List(input2)
+    }
+
+    "return passed in description" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      reduceFunction.description shouldBe "reduce"
+    }
+
+    "return None on finish" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      reduceFunction.finish() shouldBe List.empty[T]
+    }
+
+    "do nothing on clearState" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      reduceFunction.clearState()
+      verifyZeroInteractions(reduce)
+    }
+
+    "return AndThen on andThen" in {
+      val reduce = mock[(T, T) => T]
+      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      val other = mock[SingleInputFunction[T, Any]]
+      reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]]
+    }
+  }
+
+  "EmitFunction" should {
+
+    val emit = mock[T => Unit]
+    val emitFunction = new EmitFunction[T](emit)
+
+    "emit input value when processing input value" in {
+      val input = mock[T]
+
+      emitFunction.process(input) shouldBe List.empty[Unit]
+
+      verify(emit).apply(input)
+    }
+
+    "return empty description" in {
+      emitFunction.description shouldBe ""
+    }
+
+    "return None on finish" in {
+      emitFunction.finish() shouldBe List.empty[Unit]
+    }
+
+    "do nothing on clearState" in {
+      emitFunction.clearState()
+      verifyZeroInteractions(emit)
+    }
+
+    "throw exception on andThen" in {
+      val other = mock[SingleInputFunction[Unit, Any]]
+      intercept[UnsupportedOperationException] {
+        emitFunction.andThen(other)
+      }
+    }
+  }
+
+  "andThen" should {
+    "chain multiple single input function" in {
+      val split = new FlatMapFunction[String, String](line => 
line.split("\\s"), "split")
+
+      val filter = new FlatMapFunction[String, String](word =>
+        if (word.isEmpty) None else Some(word), "filter")
+
+      val map = new FlatMapFunction[String, Int](word => Some(1), "map")
+
+      val sum = new ReduceFunction[Int]({ (left, right) => left + right }, 
"sum")
+
+      val all = split.andThen(filter).andThen(map).andThen(sum)
+
+      assert(all.description == "split.filter.map.sum")
+
+      val data =
+        """
+      five  four three  two    one
+      five  four three  two
+      five  four three
+      five  four
+      five
+        """
+      // force eager evaluation
+      all.process(data).toList
+      val result = all.finish().toList
+      assert(result.nonEmpty)
+      assert(result.last == 15)
+    }
+  }
+
+  "Source" should {
+    "iterate over input source and apply attached operator" in {
+
+      val taskContext = MockUtil.mockTaskContext
+      implicit val actorSystem = MockUtil.system
+
+      val data = "one two three".split("\\s")
+      val dataSource = new CollectionDataSource[String](data)
+      val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, 
dataSource)
+
+      // Source with no transformer
+      val source = new DataSourceTask[String, String](
+        taskContext, conf)
+      source.onStart(Instant.EPOCH)
+      source.onNext(Message("next"))
+      data.foreach { s =>
+        verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
+          message => message.msg == s))
+      }
+
+      // Source with transformer
+      val anotherTaskContext = MockUtil.mockTaskContext
+      val double = new FlatMapFunction[String, String](word => List(word, 
word), "double")
+      val another = new DataSourceTask(anotherTaskContext,
+        conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
+      another.onStart(Instant.EPOCH)
+      another.onNext(Message("next"))
+      data.foreach { s =>
+        verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
+          message => message.msg == s))
+      }
+    }
+  }
+
+  "CountTriggerTask" should {
+    "group input by groupBy Function and " +
+      "apply attached operator for each group" in {
+
+      val data = "1 2  2  3 3  3"
+
+      val concat = new ReduceFunction[String]({ (left, right) =>
+        left + right
+      }, "concat")
+
+      implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+      val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](
+        GEARPUMP_STREAMING_OPERATOR, concat)
+
+      val taskContext = MockUtil.mockTaskContext
+
+      val groupBy = GroupAlsoByWindow((input: String) => input, 
CountWindow.apply(1).accumulating)
+      val task = new CountTriggerTask[String, String](groupBy, taskContext, 
config)
+      task.onStart(Instant.EPOCH)
+
+      val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
+
+      data.split("\\s+").foreach { word =>
+        task.onNext(Message(word))
+      }
+      verify(taskContext, times(6)).output(peopleCaptor.capture())
+
+      import scala.collection.JavaConverters._
+
+      val values = peopleCaptor.getAllValues.asScala.map(input => 
input.msg.asInstanceOf[String])
+      assert(values.mkString(",") == "1,2,22,3,33,333")
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+
+  "MergeTask" should {
+    "accept two stream and apply the attached operator" in {
+
+      // Source with transformer
+      val taskContext = MockUtil.mockTaskContext
+      val conf = UserConfig.empty
+      val double = new FlatMapFunction[String, String](word => List(word, 
word), "double")
+      val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
+      task.onStart(Instant.EPOCH)
+
+      val data = "1 2  2  3 3  3".split("\\s+")
+
+      data.foreach { input =>
+        task.onNext(Message(input))
+      }
+
+      verify(taskContext, times(data.length * 2)).output(anyObject())
+    }
+  }
+}
+
+object SingleInputFunctionSpec {
+  type R = AnyRef
+  type S = AnyRef
+  type T = AnyRef
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
new file mode 100644
index 0000000..871d751
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class CountTriggerTaskSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("CountTriggerTask should trigger output by number of messages in a 
window") {
+
+    implicit val system = MockUtil.system
+
+    val numGen = Gen.chooseNum[Int](1, 1000)
+
+    forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) =>
+
+      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+      val window = CountWindow.apply(windowSize)
+      when(groupBy.window).thenReturn(window)
+      val windowRunner = mock[WindowRunner]
+      val userConfig = UserConfig.empty
+
+      val task = new CountTriggerTask[Any, Any](groupBy, windowRunner,
+        MockUtil.mockTaskContext, userConfig)
+      val message = mock[Message]
+
+      for (i <- 1 to msgNum) {
+        task.onNext(message)
+      }
+      verify(windowRunner, times(msgNum)).process(message)
+      verify(windowRunner, times(msgNum / 
windowSize)).trigger(Instant.ofEpochMilli(windowSize))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
new file mode 100644
index 0000000..a69abe6
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("EventTimeTriggerTask should trigger on watermark") {
+    val longGen = Gen.chooseNum[Long](1L, 1000L)
+    val windowSizeGen = longGen
+    val windowStepGen = longGen
+    val watermarkGen = longGen.map(Instant.ofEpochMilli)
+
+    forAll(windowSizeGen, windowStepGen, watermarkGen) {
+      (windowSize: Long, windowStep: Long, watermark: Instant) =>
+
+        val window = SlidingWindow.apply(Duration.ofMillis(windowSize),
+          Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
+        val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+        val windowRunner = mock[WindowRunner]
+        val context = MockUtil.mockTaskContext
+        val config = UserConfig.empty
+
+        when(groupBy.window).thenReturn(window)
+
+        val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, 
context, config)
+
+        val message = mock[Message]
+        task.onNext(message)
+        verify(windowRunner).process(message)
+
+        task.onWatermarkProgress(watermark)
+        verify(windowRunner).trigger(any[Instant])
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
new file mode 100644
index 0000000..39e1b4c
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import 
org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
+import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, 
SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("ProcessingTimeTriggerTask should trigger on system time interval") 
{
+    val longGen = Gen.chooseNum[Long](1L, 1000L)
+    val windowSizeGen = longGen
+    val windowStepGen = longGen
+    val startTimeGen = longGen.map(Instant.ofEpochMilli)
+
+    forAll(windowSizeGen, windowStepGen, startTimeGen) {
+      (windowSize: Long, windowStep: Long, startTime: Instant) =>
+
+        val window = SlidingWindow.apply(Duration.ofMillis(windowSize),
+          Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
+        val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+        val windowRunner = mock[WindowRunner]
+        val context = MockUtil.mockTaskContext
+        val config = UserConfig.empty
+
+        when(groupBy.window).thenReturn(window)
+
+        val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, 
windowRunner, context, config)
+
+        task.onStart(startTime)
+
+        val message = mock[Message]
+        task.onNext(message)
+        verify(windowRunner).process(message)
+
+        task.receiveUnManagedMessage(Triggering)
+        verify(windowRunner).trigger(any[Instant])
+    }
+  }
+
+}

Reply via email to