Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 96312a2ac -> 5d524918d


[GEARPUMP-23] Refactor Window DSL

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [x] Make sure tests pass via `sbt clean test`.
 - [x] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality.

Author: manuzhang <[email protected]>

Closes #138 from manuzhang/window_dsl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5d524918
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5d524918
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5d524918

Branch: refs/heads/master
Commit: 5d524918d1e42d3bdd67ecf1e76b1c8c64f26572
Parents: 96312a2
Author: manuzhang <[email protected]>
Authored: Mon Feb 6 21:00:41 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Mon Feb 6 21:00:41 2017 +0800

----------------------------------------------------------------------
 .../wordcount/dsl/WindowedWordCount.scala       |   4 +-
 .../materializer/RemoteMaterializerImpl.scala   |  10 +-
 .../streaming/dsl/javaapi/JavaStream.scala      |   4 +-
 .../apache/gearpump/streaming/dsl/plan/OP.scala |   4 +-
 .../dsl/plan/functions/FunctionRunner.scala     | 128 +++++++
 .../plan/functions/SingleInputFunction.scala    | 128 -------
 .../streaming/dsl/scalaapi/Stream.scala         |  14 +-
 .../streaming/dsl/task/CountTriggerTask.scala   |   4 +-
 .../dsl/task/ProcessingTimeTriggerTask.scala    |   4 +-
 .../streaming/dsl/task/TransformTask.scala      |   6 +-
 .../streaming/dsl/window/api/Window.scala       |  77 -----
 .../streaming/dsl/window/api/WindowFn.scala     |  63 ----
 .../dsl/window/api/WindowFunction.scala         |  72 ++++
 .../streaming/dsl/window/api/Windows.scala      |  77 +++++
 .../streaming/dsl/window/impl/Window.scala      |  42 ++-
 .../dsl/window/impl/WindowRunner.scala          |  80 ++---
 .../streaming/source/DataSourceTask.scala       |   6 +-
 .../partitioner/GroupByPartitionerSpec.scala    |  11 +-
 .../gearpump/streaming/dsl/plan/OpSpec.scala    |  12 +-
 .../dsl/plan/functions/FunctionRunnerSpec.scala | 340 +++++++++++++++++++
 .../functions/SingleInputFunctionSpec.scala     | 339 ------------------
 .../dsl/task/CountTriggerTaskSpec.scala         |   4 +-
 .../dsl/task/EventTimeTriggerTaskSpec.scala     |   2 +-
 .../task/ProcessingTimeTriggerTaskSpec.scala    |   2 +-
 .../streaming/dsl/task/TransformTaskSpec.scala  |   8 +-
 .../streaming/source/DataSourceTaskSpec.scala   |   6 +-
 26 files changed, 736 insertions(+), 711 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index 401eac0..e1aac4c 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -23,7 +23,7 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
 import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
-import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
FixedWindow}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
FixedWindows}
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.AkkaApp
@@ -39,7 +39,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser 
{
       // word => (word, count)
       flatMap(line => line.split("[\\s]+")).map((_, 1)).
       // fix window
-      window(FixedWindow.apply(Duration.ofMillis(5L))
+      window(FixedWindows.apply(Duration.ofMillis(5L))
         .triggering(EventTimeTrigger)).
       // (word, count1), (word, count2) => (word, count1 + count2)
       groupBy(_._1).

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index e065c90..74fe077 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -38,7 +38,7 @@ import 
org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper
 import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, 
DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.api.CountWindows
 import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
 import org.apache.gearpump.util.Graph
@@ -163,8 +163,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
           ProcessorOp(processor.processor, parallelism, updatedConf, "source")
         case sinkBridge: SinkBridgeModule[_, _] =>
           ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
-        case groupBy: GroupByModule[_, _] =>
-          GroupByOp(GroupAlsoByWindow(groupBy.groupBy, 
CountWindow.apply(1).accumulating),
+        case groupBy: GroupByModule[Any, Any] =>
+          GroupByOp(GroupAlsoByWindow(groupBy.groupBy, 
CountWindows.apply[Any](1).accumulating),
             parallelism, "groupBy", conf)
         case reduce: ReduceModule[_] =>
           reduceOp(reduce.f, conf)
@@ -241,8 +241,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
         val foldConf = conf.withValue(FoldTask.ZERO, 
fold.zero.asInstanceOf[AnyRef]).
           withValue(FoldTask.AGGREGATOR, fold.f)
         ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
-      case groupBy: GroupBy[_, _] =>
-        GroupByOp(GroupAlsoByWindow(groupBy.keyFor, 
CountWindow.apply(1).accumulating),
+      case groupBy: GroupBy[Any, Any] =>
+        GroupByOp(GroupAlsoByWindow(groupBy.keyFor, 
CountWindows.apply[Any](1).accumulating),
           groupBy.maxSubstreams, "groupBy", conf)
       case groupedWithin: GroupedWithin[_] =>
         val diConf = 
conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 7f3c250..592c4dc 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -22,7 +22,7 @@ import 
org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunct
 import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => 
JFlatMapFunction, GroupByFunction}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
-import org.apache.gearpump.streaming.dsl.window.api.Window
+import org.apache.gearpump.streaming.dsl.window.api.Windows
 import org.apache.gearpump.streaming.task.Task
 
 /**
@@ -68,7 +68,7 @@ class JavaStream[T](val stream: Stream[T]) {
     new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
   }
 
-  def window(win: Window, description: String): JavaWindowStream[T] = {
+  def window(win: Windows[T], description: String): JavaWindowStream[T] = {
     new JavaWindowStream[T](stream.window(win, description))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 5aaf2fa..56f16e1 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, 
SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, 
FunctionRunner}
 import org.apache.gearpump.streaming.{Constants, Processor}
 import org.apache.gearpump.streaming.dsl.task.TransformTask
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
@@ -124,7 +124,7 @@ case class DataSinkOp(
  * to another Op to be used
  */
 case class ChainableOp[IN, OUT](
-    fn: SingleInputFunction[IN, OUT],
+    fn: FunctionRunner[IN, OUT],
     userConfig: UserConfig = UserConfig.empty) extends Op {
 
   override def description: String = fn.description

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
new file mode 100644
index 0000000..36821e4
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+
+/**
+ * Interface to invoke SerializableFunction methods
+ *
+ * @param IN input value type
+ * @param OUT output value type
+ */
+sealed trait FunctionRunner[IN, OUT] extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
+  def process(value: IN): TraversableOnce[OUT]
+
+  def finish(): TraversableOnce[OUT] = None
+
+  def teardown(): Unit = {}
+
+  def description: String
+}
+
+case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE],
+    second: FunctionRunner[MIDDLE, OUT])
+  extends FunctionRunner[IN, OUT] {
+
+  override def setup(): Unit = {
+    first.setup()
+    second.setup()
+  }
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    first.process(value).flatMap(second.process)
+  }
+
+  override def finish(): TraversableOnce[OUT] = {
+    val firstResult = first.finish().flatMap(second.process)
+    if (firstResult.isEmpty) {
+      second.finish()
+    } else {
+      firstResult
+    }
+  }
+
+  override def teardown(): Unit = {
+    first.teardown()
+    second.teardown()
+  }
+
+  override def description: String = {
+    Option(first.description).flatMap { description =>
+      Option(second.description).map(description + "." + _)
+    }.orNull
+  }
+}
+
+class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: 
String)
+  extends FunctionRunner[IN, OUT] {
+
+  override def setup(): Unit = {
+    fn.setup()
+  }
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    fn(value)
+  }
+
+  override def teardown(): Unit = {
+    fn.teardown()
+  }
+}
+
+class Reducer[T](fn: ReduceFunction[T], val description: String)
+  extends FunctionRunner[T, T] {
+
+  private var state: Option[T] = None
+
+  override def setup(): Unit = {
+    fn.setup()
+  }
+
+  override def process(value: T): TraversableOnce[T] = {
+    if (state.isEmpty) {
+      state = Option(value)
+    } else {
+      state = state.map(fn(_, value))
+    }
+    None
+  }
+
+  override def finish(): TraversableOnce[T] = {
+    state
+  }
+
+  override def teardown(): Unit = {
+    state = None
+    fn.teardown()
+  }
+}
+
+class Emit[T](emit: T => Unit) extends FunctionRunner[T, Unit] {
+
+  override def process(value: T): TraversableOnce[Unit] = {
+    emit(value)
+    None
+  }
+
+  override def description: String = ""
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
deleted file mode 100644
index 687fd2e..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.plan.functions
-
-import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-
-/**
- * Internal function to process single input
- *
- * @param IN input value type
- * @param OUT output value type
- */
-sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable {
-
-  def setup(): Unit = {}
-
-  def process(value: IN): TraversableOnce[OUT]
-
-  def finish(): TraversableOnce[OUT] = None
-
-  def teardown(): Unit = {}
-
-  def description: String
-}
-
-case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE],
-    second: SingleInputFunction[MIDDLE, OUT])
-  extends SingleInputFunction[IN, OUT] {
-
-  override def setup(): Unit = {
-    first.setup()
-    second.setup()
-  }
-
-  override def process(value: IN): TraversableOnce[OUT] = {
-    first.process(value).flatMap(second.process)
-  }
-
-  override def finish(): TraversableOnce[OUT] = {
-    val firstResult = first.finish().flatMap(second.process)
-    if (firstResult.isEmpty) {
-      second.finish()
-    } else {
-      firstResult
-    }
-  }
-
-  override def teardown(): Unit = {
-    first.teardown()
-    second.teardown()
-  }
-
-  override def description: String = {
-    Option(first.description).flatMap { description =>
-      Option(second.description).map(description + "." + _)
-    }.orNull
-  }
-}
-
-class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: 
String)
-  extends SingleInputFunction[IN, OUT] {
-
-  override def setup(): Unit = {
-    fn.setup()
-  }
-
-  override def process(value: IN): TraversableOnce[OUT] = {
-    fn(value)
-  }
-
-  override def teardown(): Unit = {
-    fn.teardown()
-  }
-}
-
-class Reducer[T](fn: ReduceFunction[T], val description: String)
-  extends SingleInputFunction[T, T] {
-
-  private var state: Option[T] = None
-
-  override def setup(): Unit = {
-    fn.setup()
-  }
-
-  override def process(value: T): TraversableOnce[T] = {
-    if (state.isEmpty) {
-      state = Option(value)
-    } else {
-      state = state.map(fn(_, value))
-    }
-    None
-  }
-
-  override def finish(): TraversableOnce[T] = {
-    state
-  }
-
-  override def teardown(): Unit = {
-    state = None
-    fn.teardown()
-  }
-}
-
-class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
-
-  override def process(value: T): TraversableOnce[Unit] = {
-    emit(value)
-    None
-  }
-
-  override def description: String = ""
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 430d795..bdb245c 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -25,7 +25,7 @@ import 
org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.plan._
 import org.apache.gearpump.streaming.dsl.plan.functions._
 import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, 
GroupAlsoByWindow}
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowAndGroup}
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -122,7 +122,7 @@ class Stream[T](
     transform(new Reducer[T](fn, description))
   }
 
-  private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = {
+  private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = {
     val op = ChainableOp(fn)
     graph.addVertex(op)
     graph.addEdge(thisNode, edge.getOrElse(Direct), op)
@@ -173,7 +173,7 @@ class Stream[T](
    */
   def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
       description: String = "groupBy"): Stream[T] = {
-    window(CountWindow.apply(1).accumulating)
+    window(CountWindows.apply(1).accumulating)
       .groupBy[GROUP](fn, parallelism, description)
   }
 
@@ -184,7 +184,7 @@ class Stream[T](
    * @param description window description
    * @return [[WindowStream]] where groupBy could be applied
    */
-  def window(win: Window, description: String = "window"): WindowStream[T] = {
+  def window(win: Windows[T], description: String = "window"): WindowStream[T] 
= {
     new WindowStream[T](graph, edge, thisNode, win, description)
   }
 
@@ -206,12 +206,12 @@ class Stream[T](
 }
 
 class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], 
thisNode: Op,
-    window: Window, winDesc: String) {
+    window: Windows[T], winDesc: String) {
 
   def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
       description: String = "groupBy"): Stream[T] = {
-    val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, 
window)
-    val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+    val groupBy: GroupByFn[T, List[WindowAndGroup[GROUP]]] = 
GroupAlsoByWindow(fn, window)
+    val groupOp = GroupByOp[T, List[WindowAndGroup[GROUP]]](groupBy, 
parallelism,
       s"$winDesc.$description")
     graph.addVertex(groupOp)
     graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
index 06f2964..0dc28eb 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -22,7 +22,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn
+import org.apache.gearpump.streaming.dsl.window.api.CountWindowFunction
 import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
GroupAlsoByWindow, WindowRunner}
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
@@ -48,7 +48,7 @@ class CountTriggerTask[IN, GROUP](
       taskContext, userConfig)
   }
 
-  private val windowSize = 
groupBy.window.windowFn.asInstanceOf[CountWindowFn].size
+  private val windowSize = 
groupBy.window.windowFn.asInstanceOf[CountWindowFunction[IN]].size
   private var num = 0
 
   override def onNext(msg: Message): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
index 78ba762..a04e3ca 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -25,7 +25,7 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import 
org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn
+import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFunction
 import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
GroupAlsoByWindow, WindowRunner}
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
@@ -57,7 +57,7 @@ class ProcessingTimeTriggerTask[IN, GROUP](
       taskContext, userConfig)
   }
 
-  private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn]
+  private val windowFn = 
groupBy.window.windowFn.asInstanceOf[SlidingWindowFunction[IN]]
   private val windowSizeMs = windowFn.size.toMillis
   private val windowStepMs = windowFn.step.toMillis
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index f8fbefa..5febeb6 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -22,14 +22,14 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
-class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
+class TransformTask[IN, OUT](operator: Option[FunctionRunner[IN, OUT]],
     taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, 
userConf) {
 
   def this(taskContext: TaskContext, userConf: UserConfig) = {
-    this(userConf.getValue[SingleInputFunction[IN, OUT]](
+    this(userConf.getValue[FunctionRunner[IN, OUT]](
       GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
deleted file mode 100644
index 4b94879..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.window.api
-
-import java.time.Duration
-
-/**
- *
- * @param windowFn
- * @param trigger
- * @param accumulationMode
- */
-case class Window(
-    windowFn: WindowFn,
-    trigger: Trigger = EventTimeTrigger,
-    accumulationMode: AccumulationMode = Discarding) {
-
-  def triggering(trigger: Trigger): Window = {
-    Window(windowFn, trigger)
-  }
-
-  def accumulating: Window = {
-    Window(windowFn, trigger, Accumulating)
-  }
-
-  def discarding: Window = {
-    Window(windowFn, trigger, Discarding)
-  }
-}
-
-object CountWindow {
-
-  def apply(size: Int): Window = {
-    Window(CountWindowFn(size), CountTrigger)
-  }
-}
-
-object FixedWindow {
-
-  /**
-   * Defines a FixedWindow.
-   * @param size window size
-   * @return a Window definition
-   */
-  def apply(size: Duration): Window = {
-    Window(SlidingWindowFn(size, size))
-  }
-}
-
-object SlidingWindow {
-
-  /**
-   * Defines a SlidingWindow
-   * @param size window size
-   * @param step window step to slide forward
-   * @return a Window definition
-   */
-  def apply(size: Duration, step: Duration): Window = {
-    Window(SlidingWindowFn(size, step))
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
deleted file mode 100644
index 0768730..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.window.api
-
-import java.time.{Duration, Instant}
-
-import org.apache.gearpump.TimeStamp
-import org.apache.gearpump.streaming.dsl.window.impl.Bucket
-
-import scala.collection.mutable.ArrayBuffer
-
-sealed trait WindowFn {
-  def apply(timestamp: Instant): List[Bucket]
-}
-
-case class SlidingWindowFn(size: Duration, step: Duration)
-  extends WindowFn {
-
-  def this(size: Duration) = {
-    this(size, size)
-  }
-
-  override def apply(timestamp: Instant): List[Bucket] = {
-    val sizeMillis = size.toMillis
-    val stepMillis = step.toMillis
-    val timeMillis = timestamp.toEpochMilli
-    val windows = ArrayBuffer.empty[Bucket]
-    var start = lastStartFor(timeMillis, stepMillis)
-    windows += Bucket.ofEpochMilli(start, start + sizeMillis)
-    start -= stepMillis
-    while (start >= timeMillis) {
-      windows += Bucket.ofEpochMilli(start, start + sizeMillis)
-      start -= stepMillis
-    }
-    windows.toList
-  }
-
-  private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp 
= {
-    timestamp - (timestamp + windowStep) % windowStep
-  }
-}
-
-case class CountWindowFn(size: Int) extends WindowFn {
-
-  override def apply(timestamp: Instant): List[Bucket] = {
-    List(Bucket.ofEpochMilli(0, size))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
new file mode 100644
index 0000000..9ef171d
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.dsl.window.impl.Window
+
+import scala.collection.mutable.ArrayBuffer
+
+object WindowFunction {
+
+  trait Context[T] {
+    def element: T
+    def timestamp: Instant
+  }
+}
+
+trait WindowFunction[T] {
+  def apply(context: WindowFunction.Context[T]): Array[Window]
+}
+
+case class SlidingWindowFunction[T](size: Duration, step: Duration)
+  extends WindowFunction[T] {
+
+  def this(size: Duration) = {
+    this(size, size)
+  }
+
+  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+    val timestamp = context.timestamp
+    val sizeMillis = size.toMillis
+    val stepMillis = step.toMillis
+    val timeMillis = timestamp.toEpochMilli
+    val windows = ArrayBuffer.empty[Window]
+    var start = lastStartFor(timeMillis, stepMillis)
+    windows += Window.ofEpochMilli(start, start + sizeMillis)
+    start -= stepMillis
+    while (start >= timeMillis) {
+      windows += Window.ofEpochMilli(start, start + sizeMillis)
+      start -= stepMillis
+    }
+    windows.toArray
+  }
+
+  private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp 
= {
+    timestamp - (timestamp + windowStep) % windowStep
+  }
+}
+
+case class CountWindowFunction[T](size: Int) extends WindowFunction[T] {
+
+  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+    Array(Window.ofEpochMilli(0, size))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
new file mode 100644
index 0000000..c636a55
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.Duration
+
+/**
+ *
+ * @param windowFn
+ * @param trigger
+ * @param accumulationMode
+ */
+case class Windows[T](
+    windowFn: WindowFunction[T],
+    trigger: Trigger = EventTimeTrigger,
+    accumulationMode: AccumulationMode = Discarding) {
+
+  def triggering(trigger: Trigger): Windows[T] = {
+    Windows(windowFn, trigger)
+  }
+
+  def accumulating: Windows[T] = {
+    Windows(windowFn, trigger, Accumulating)
+  }
+
+  def discarding: Windows[T] = {
+    Windows(windowFn, trigger, Discarding)
+  }
+}
+
+object CountWindows {
+
+  def apply[T](size: Int): Windows[T] = {
+    Windows(CountWindowFunction(size), CountTrigger)
+  }
+}
+
+object FixedWindows {
+
+  /**
+   * Defines a FixedWindow.
+   * @param size window size
+   * @return a Window definition
+   */
+  def apply[T](size: Duration): Windows[T] = {
+    Windows(SlidingWindowFunction(size, size))
+  }
+}
+
+object SlidingWindow {
+
+  /**
+   * Defines a SlidingWindow
+   * @param size window size
+   * @param step window step to slide forward
+   * @return a Window definition
+   */
+  def apply[T](size: Duration, step: Duration): Windows[T] = {
+    Windows(SlidingWindowFunction(size, step))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 53cf5d0..eb5d551 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -28,17 +28,18 @@ import org.apache.gearpump.streaming.dsl.window.api._
 import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, 
EventTimeTriggerTask, ProcessingTimeTriggerTask}
 import org.apache.gearpump.streaming.task.Task
 
-object Bucket {
-  def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = {
-    Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
+object Window {
+  def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Window = {
+    Window(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
   }
 }
 
 /**
  * A window unit including startTime and excluding endTime.
  */
-case class Bucket(startTime: Instant, endTime: Instant) extends 
Comparable[Bucket] {
-  override def compareTo(o: Bucket): Int = {
+case class Window(startTime: Instant, endTime: Instant) extends 
Comparable[Window] {
+
+  override def compareTo(o: Window): Int = {
     val ret = startTime.compareTo(o.startTime)
     if (ret != 0) {
       ret
@@ -48,13 +49,32 @@ case class Bucket(startTime: Instant, endTime: Instant) 
extends Comparable[Bucke
   }
 }
 
-case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window)
-  extends GroupByFn[T, (GROUP, List[Bucket])] {
+case class WindowAndGroup[GROUP](window: Window, group: GROUP)
+  extends Comparable[WindowAndGroup[GROUP]] {
+
+  override def compareTo(o: WindowAndGroup[GROUP]): Int = {
+    val ret = window.compareTo(o.window)
+    if (ret != 0) {
+      ret
+    } else if (group.equals(o.group)) {
+      0
+    } else {
+      -1
+    }
+  }
+}
+
+case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: 
Windows[T])
+  extends GroupByFn[T, List[WindowAndGroup[GROUP]]] {
 
-  override def groupBy(message: Message): (GROUP, List[Bucket]) = {
-    val group = groupByFn(message.msg.asInstanceOf[T])
-    val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp))
-    group -> buckets
+  override def groupBy(message: Message): List[WindowAndGroup[GROUP]] = {
+    val ele = message.msg.asInstanceOf[T]
+    val group = groupByFn(ele)
+    val windows = window.windowFn(new WindowFunction.Context[T] {
+      override def element: T = ele
+      override def timestamp: Instant = Instant.ofEpochMilli(message.timestamp)
+    })
+    windows.map(WindowAndGroup(_, group)).toList
   }
 
   override def getProcessor(parallelism: Int, description: String,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 223a4af..7a16100 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -26,9 +26,8 @@ import com.gs.collections.api.block.procedure.Procedure
 import com.gs.collections.impl.list.mutable.FastList
 import com.gs.collections.impl.map.mutable.UnifiedMap
 import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
-import com.gs.collections.impl.set.mutable.UnifiedSet
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, 
SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, 
FunctionRunner}
 import org.apache.gearpump.streaming.dsl.window.api.Discarding
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.LogUtil
@@ -45,36 +44,32 @@ object DefaultWindowRunner {
 
   private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, 
_, _]])
 
-  case class WindowGroup[GROUP](bucket: Bucket, group: GROUP)
+  case class InputsAndFn[IN, OUT](inputs: FastList[IN], fn: FunctionRunner[IN, 
OUT])
 }
 
 class DefaultWindowRunner[IN, GROUP, OUT](
     taskContext: TaskContext, userConfig: UserConfig,
     groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
   extends WindowRunner {
-  import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._
 
-  private val windows = new TreeSortedMap[Bucket, 
UnifiedSet[WindowGroup[GROUP]]]
-  private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]]
-  private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
+  private val groupedInputs = new TreeSortedMap[WindowAndGroup[GROUP], 
FastList[IN]]
+  private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
 
   override def process(message: Message): Unit = {
-    val (group, buckets) = groupBy.groupBy(message)
-    buckets.foreach { bucket =>
-      val wg = WindowGroup(bucket, group)
-      val wgs = windows.getOrDefault(bucket, new 
UnifiedSet[WindowGroup[GROUP]](1))
-      wgs.add(wg)
-      windows.put(bucket, wgs)
-
-      val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1))
-      inputs.add(message.msg.asInstanceOf[IN])
-      windowGroups.put(wg, inputs)
-    }
-    if (!groupFns.containsKey(group)) {
-      val fn = userConfig.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
-      fn.setup()
-      groupFns.put(group, fn)
+    val wgs = groupBy.groupBy(message)
+    wgs.foreach { wg =>
+      if (!groupedInputs.containsKey(wg)) {
+        val inputs = new FastList[IN](1)
+        groupedInputs.put(wg, inputs)
+      }
+      groupedInputs.get(wg).add(message.msg.asInstanceOf[IN])
+      if (!groupedFnRunners.containsKey(wg.group)) {
+        val fn = userConfig.getValue[FunctionRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
+        fn.setup()
+        groupedFnRunners.put(wg.group, fn)
+      }
     }
+
   }
 
   override def trigger(time: Instant): Unit = {
@@ -82,29 +77,28 @@ class DefaultWindowRunner[IN, GROUP, OUT](
 
     @annotation.tailrec
     def onTrigger(): Unit = {
-      if (windows.notEmpty()) {
-        val first = windows.firstKey
-        if (!time.isBefore(first.endTime)) {
-          val wgs = windows.remove(first)
-          wgs.forEach(new Procedure[WindowGroup[GROUP]] {
-            override def value(each: WindowGroup[GROUP]): Unit = {
-              val inputs = windowGroups.remove(each)
-              val reduceFn = AndThen(groupFns.get(each.group), new 
Emit[OUT](emitResult(_, time)))
-              inputs.forEach(new Procedure[IN] {
-                override def value(t: IN): Unit = {
-                  // .toList forces eager evaluation
-                  reduceFn.process(t).toList
-                }
-              })
-              // .toList forces eager evaluation
-              reduceFn.finish().toList
-              if (groupBy.window.accumulationMode == Discarding) {
-                reduceFn.teardown()
+      if (groupedInputs.notEmpty()) {
+        val first = groupedInputs.firstKey
+        if (!time.isBefore(first.window.endTime)) {
+          val inputs = groupedInputs.remove(first)
+          if (groupedFnRunners.containsKey(first.group)) {
+            val reduceFn = AndThen(groupedFnRunners.get(first.group),
+              new Emit[OUT](output => emitResult(output, time)))
+            inputs.forEach(new Procedure[IN] {
+              override def value(t: IN): Unit = {
+                // .toList forces eager evaluation
+                reduceFn.process(t).toList
               }
+            })
+            // .toList forces eager evaluation
+            reduceFn.finish().toList
+            if (groupBy.window.accumulationMode == Discarding) {
+              reduceFn.teardown()
             }
-          })
-
-          onTrigger()
+            onTrigger()
+          } else {
+            throw new RuntimeException(s"FunctionRunner not found for group 
${first.group}")
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 450f2d6..0d0dfa2 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 /**
@@ -42,13 +42,13 @@ class DataSourceTask[IN, OUT] private[source](
     context: TaskContext,
     conf: UserConfig,
     source: DataSource,
-    operator: Option[SingleInputFunction[IN, OUT]])
+    operator: Option[FunctionRunner[IN, OUT]])
   extends Task(context, conf) {
 
   def this(context: TaskContext, conf: UserConfig) = {
     this(context, conf,
       conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
-      conf.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system)
+      conf.getValue[FunctionRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system)
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
index f49eb04..fb45e35 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
@@ -23,8 +23,8 @@ import java.time.Duration
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.apache.gearpump.Message
 import 
org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
-import org.apache.gearpump.streaming.dsl.window.api.{FixedWindow, GroupByFn}
-import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, 
GroupAlsoByWindow}
+import org.apache.gearpump.streaming.dsl.window.api.{FixedWindows, GroupByFn}
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowAndGroup}
 
 class GroupByPartitionerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
 
@@ -34,9 +34,10 @@ class GroupByPartitionerSpec extends FlatSpec with Matchers 
with BeforeAndAfterA
     val michelle = People("Michelle", "female")
 
     val partitionNum = 10
-    val groupByFn: GroupByFn[People, (String, List[Bucket])] =
-      GroupAlsoByWindow[People, String](_.gender, 
FixedWindow.apply(Duration.ofMillis(5)))
-    val groupBy = new GroupByPartitioner[People, (String, 
List[Bucket])](groupByFn)
+    val groupByFn: GroupByFn[People, List[WindowAndGroup[String]]] =
+      GroupAlsoByWindow[People, String](_.gender,
+        FixedWindows.apply[People](Duration.ofMillis(5)))
+    val groupBy = new GroupByPartitioner[People, 
List[WindowAndGroup[String]]](groupByFn)
     groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
       groupBy.getPartition(Message(tom, 2L), partitionNum)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index f0920de..461d3da 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, 
AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, 
SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, 
FunctionRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 import org.apache.gearpump.streaming.sink.DataSink
@@ -65,7 +65,7 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
       val dataSource = new AnySource
       val dataSourceOp = DataSourceOp(dataSource)
       val chainableOp = mock[ChainableOp[Any, Any]]
-      val fn = mock[SingleInputFunction[Any, Any]]
+      val fn = mock[FunctionRunner[Any, Any]]
 
       val chainedOp = dataSourceOp.chain(chainableOp)
 
@@ -138,10 +138,10 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
   "ChainableOp" should {
 
     "chain ChainableOp" in {
-      val fn1 = mock[SingleInputFunction[Any, Any]]
+      val fn1 = mock[FunctionRunner[Any, Any]]
       val chainableOp1 = ChainableOp[Any, Any](fn1)
 
-      val fn2 = mock[SingleInputFunction[Any, Any]]
+      val fn2 = mock[FunctionRunner[Any, Any]]
       val chainableOp2 = ChainableOp[Any, Any](fn2)
 
       val chainedOp = chainableOp1.chain(chainableOp2)
@@ -171,7 +171,7 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
     "chain ChainableOp" in {
       val groupByFn = mock[GroupByFn[Any, Any]]
       val groupByOp = GroupByOp[Any, Any](groupByFn)
-      val fn = mock[SingleInputFunction[Any, Any]]
+      val fn = mock[FunctionRunner[Any, Any]]
       val chainableOp = mock[ChainableOp[Any, Any]]
       when(chainableOp.fn).thenReturn(fn)
 
@@ -199,7 +199,7 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
     val mergeOp = MergeOp("merge")
 
     "chain ChainableOp" in {
-      val fn = mock[SingleInputFunction[Any, Any]]
+      val fn = mock[FunctionRunner[Any, Any]]
       val chainableOp = mock[ChainableOp[Any, Any]]
       when(chainableOp.fn).thenReturn(fn)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
new file mode 100644
index 0000000..08a259a
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.window.api.CountWindows
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
+  import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunnerSpec._
+
+  "AndThen" should {
+
+    val first = mock[FunctionRunner[R, S]]
+    val second = mock[FunctionRunner[S, T]]
+    val andThen = AndThen(first, second)
+
+    "chain first and second functions when processing input value" in {
+      val input = mock[R]
+      val firstOutput = mock[S]
+      val secondOutput = mock[T]
+      when(first.process(input)).thenReturn(Some(firstOutput))
+      when(second.process(firstOutput)).thenReturn(Some(secondOutput))
+
+      andThen.process(input).toList shouldBe List(secondOutput)
+    }
+
+    "return chained description" in {
+      when(first.description).thenReturn("first")
+      when(second.description).thenReturn("second")
+      andThen.description shouldBe "first.second"
+    }
+
+    "return either first result or second on finish" in {
+      val firstResult = mock[S]
+      val processedFirst = mock[T]
+      val secondResult = mock[T]
+
+      when(first.finish()).thenReturn(Some(firstResult))
+      when(second.process(firstResult)).thenReturn(Some(processedFirst))
+      andThen.finish().toList shouldBe List(processedFirst)
+
+      when(first.finish()).thenReturn(None)
+      when(second.finish()).thenReturn(Some(secondResult))
+      andThen.finish().toList shouldBe List(secondResult)
+    }
+
+    "set up both functions on setup" in {
+      andThen.setup()
+
+      verify(first).setup()
+      verify(second).setup()
+    }
+
+    "tear down both functions on teardown" in {
+      andThen.teardown()
+
+      verify(first).teardown()
+      verify(second).teardown()
+    }
+
+    "chain multiple single input function" in {
+      val split = new FlatMapper[String, 
String](FlatMapFunction(_.split("\\s")), "split")
+
+      val filter = new FlatMapper[String, String](
+        FlatMapFunction(word => if (word.isEmpty) None else Some(word)), 
"filter")
+
+      val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), 
"map")
+
+      val sum = new Reducer[Int](ReduceFunction({(left, right) => left + 
right}), "sum")
+
+      val all = AndThen(split, AndThen(filter, AndThen(map, sum)))
+
+      assert(all.description == "split.filter.map.sum")
+
+      val data =
+        """
+      five  four three  two    one
+      five  four three  two
+      five  four three
+      five  four
+      five
+        """
+      // force eager evaluation
+      all.process(data).toList
+      val result = all.finish().toList
+      assert(result.nonEmpty)
+      assert(result.last == 15)
+    }
+  }
+
+  "FlatMapper" should {
+
+    val flatMapFunction = mock[FlatMapFunction[R, S]]
+    val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap")
+
+    "call flatMap function when processing input value" in {
+      val input = mock[R]
+      flatMapper.process(input)
+      verify(flatMapFunction).apply(input)
+    }
+
+    "return passed in description" in {
+      flatMapper.description shouldBe "flatMap"
+    }
+
+    "return None on finish" in {
+      flatMapper.finish() shouldBe List.empty[S]
+    }
+
+    "set up FlatMapFunction on setup" in {
+      flatMapper.setup()
+
+      verify(flatMapFunction).setup()
+    }
+
+    "tear down FlatMapFunction on teardown" in {
+      flatMapper.teardown()
+
+      verify(flatMapFunction).teardown()
+    }
+  }
+
+  "ReduceFunction" should {
+
+    "call reduce function when processing input value" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      val input1 = mock[T]
+      val input2 = mock[T]
+      val output = mock[T]
+
+      when(reduceFunction.apply(input1, input2)).thenReturn(output, output)
+
+      reducer.process(input1) shouldBe List.empty[T]
+      reducer.process(input2) shouldBe List.empty[T]
+      reducer.finish() shouldBe List(output)
+
+      reducer.teardown()
+      reducer.process(input1) shouldBe List.empty[T]
+      reducer.teardown()
+      reducer.process(input2) shouldBe List.empty[T]
+      reducer.finish() shouldBe List(input2)
+    }
+
+    "return passed in description" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.description shouldBe "reduce"
+    }
+
+    "return None on finish" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.finish() shouldBe List.empty[T]
+    }
+
+    "set up reduce function on setup" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.setup()
+
+      verify(reduceFunction).setup()
+    }
+
+    "tear down reduce function on teardown" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.teardown()
+
+      verify(reduceFunction).teardown()
+    }
+  }
+
+  "Emit" should {
+
+    val emitFunction = mock[T => Unit]
+    val emit = new Emit[T](emitFunction)
+
+    "emit input value when processing input value" in {
+      val input = mock[T]
+
+      emit.process(input) shouldBe List.empty[Unit]
+
+      verify(emitFunction).apply(input)
+    }
+
+    "return empty description" in {
+      emit.description shouldBe ""
+    }
+
+    "return None on finish" in {
+      emit.finish() shouldBe List.empty[Unit]
+    }
+
+    "do nothing on setup" in {
+      emit.setup()
+
+      verifyZeroInteractions(emitFunction)
+    }
+
+    "do nothing on teardown" in {
+      emit.teardown()
+
+      verifyZeroInteractions(emitFunction)
+    }
+  }
+
+  "Source" should {
+    "iterate over input source and apply attached operator" in {
+
+      val taskContext = MockUtil.mockTaskContext
+      implicit val actorSystem = MockUtil.system
+
+      val data = "one two three".split("\\s+")
+      val dataSource = new CollectionDataSource[String](data)
+      val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, 
dataSource)
+
+      // Source with no transformer
+      val source = new DataSourceTask[String, String](
+        taskContext, conf)
+      source.onStart(Instant.EPOCH)
+      source.onNext(Message("next"))
+      data.foreach { s =>
+        verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
+          message => message.msg == s))
+      }
+
+      // Source with transformer
+      val anotherTaskContext = MockUtil.mockTaskContext
+      val double = new FlatMapper[String, String](FlatMapFunction(
+        word => List(word, word)), "double")
+      val another = new DataSourceTask(anotherTaskContext,
+        conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
+      another.onStart(Instant.EPOCH)
+      another.onNext(Message("next"))
+      data.foreach { s =>
+        verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
+          message => message.msg == s))
+      }
+    }
+  }
+
+  "CountTriggerTask" should {
+    "group input by groupBy Function and " +
+      "apply attached operator for each group" in {
+
+      val data = "1 2  2  3 3  3"
+
+      val concat = new Reducer[String](ReduceFunction({ (left, right) =>
+        left + right}), "concat")
+
+      implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+      val config = UserConfig.empty.withValue[FunctionRunner[String, String]](
+        GEARPUMP_STREAMING_OPERATOR, concat)
+
+      val taskContext = MockUtil.mockTaskContext
+
+      val groupBy = GroupAlsoByWindow((input: String) => input,
+        CountWindows.apply[String](1).accumulating)
+      val task = new CountTriggerTask[String, String](groupBy, taskContext, 
config)
+      task.onStart(Instant.EPOCH)
+
+      val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
+
+      data.split("\\s+").foreach { word =>
+        task.onNext(Message(word))
+      }
+      verify(taskContext, times(6)).output(peopleCaptor.capture())
+
+      import scala.collection.JavaConverters._
+
+      val values = peopleCaptor.getAllValues.asScala.map(input => 
input.msg.asInstanceOf[String])
+      assert(values.mkString(",") == "1,2,22,3,33,333")
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+
+  "MergeTask" should {
+    "accept two stream and apply the attached operator" in {
+
+      // Source with transformer
+      val taskContext = MockUtil.mockTaskContext
+      val conf = UserConfig.empty
+      val double = new FlatMapper[String, String](FlatMapFunction(
+        word => List(word, word)), "double")
+      val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
+      task.onStart(Instant.EPOCH)
+
+      val data = "1 2  2  3 3  3".split("\\s+")
+
+      data.foreach { input =>
+        task.onNext(Message(input))
+      }
+
+      verify(taskContext, times(data.length * 2)).output(anyObject())
+    }
+  }
+}
+
+object FunctionRunnerSpec {
+  type R = AnyRef
+  type S = AnyRef
+  type T = AnyRef
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
deleted file mode 100644
index 2c03e1c..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.plan.functions
-
-import java.time.Instant
-
-import akka.actor.ActorSystem
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
-import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.dsl.window.api.CountWindow
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{Matchers, WordSpec}
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar 
{
-  import 
org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._
-
-  "AndThen" should {
-
-    val first = mock[SingleInputFunction[R, S]]
-    val second = mock[SingleInputFunction[S, T]]
-    val andThen = AndThen(first, second)
-
-    "chain first and second functions when processing input value" in {
-      val input = mock[R]
-      val firstOutput = mock[S]
-      val secondOutput = mock[T]
-      when(first.process(input)).thenReturn(Some(firstOutput))
-      when(second.process(firstOutput)).thenReturn(Some(secondOutput))
-
-      andThen.process(input).toList shouldBe List(secondOutput)
-    }
-
-    "return chained description" in {
-      when(first.description).thenReturn("first")
-      when(second.description).thenReturn("second")
-      andThen.description shouldBe "first.second"
-    }
-
-    "return either first result or second on finish" in {
-      val firstResult = mock[S]
-      val processedFirst = mock[T]
-      val secondResult = mock[T]
-
-      when(first.finish()).thenReturn(Some(firstResult))
-      when(second.process(firstResult)).thenReturn(Some(processedFirst))
-      andThen.finish().toList shouldBe List(processedFirst)
-
-      when(first.finish()).thenReturn(None)
-      when(second.finish()).thenReturn(Some(secondResult))
-      andThen.finish().toList shouldBe List(secondResult)
-    }
-
-    "set up both functions on setup" in {
-      andThen.setup()
-
-      verify(first).setup()
-      verify(second).setup()
-    }
-
-    "tear down both functions on teardown" in {
-      andThen.teardown()
-
-      verify(first).teardown()
-      verify(second).teardown()
-    }
-
-    "chain multiple single input function" in {
-      val split = new FlatMapper[String, 
String](FlatMapFunction(_.split("\\s")), "split")
-
-      val filter = new FlatMapper[String, String](
-        FlatMapFunction(word => if (word.isEmpty) None else Some(word)), 
"filter")
-
-      val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), 
"map")
-
-      val sum = new Reducer[Int](ReduceFunction({(left, right) => left + 
right}), "sum")
-
-      val all = AndThen(split, AndThen(filter, AndThen(map, sum)))
-
-      assert(all.description == "split.filter.map.sum")
-
-      val data =
-        """
-      five  four three  two    one
-      five  four three  two
-      five  four three
-      five  four
-      five
-        """
-      // force eager evaluation
-      all.process(data).toList
-      val result = all.finish().toList
-      assert(result.nonEmpty)
-      assert(result.last == 15)
-    }
-  }
-
-  "FlatMapper" should {
-
-    val flatMapFunction = mock[FlatMapFunction[R, S]]
-    val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap")
-
-    "call flatMap function when processing input value" in {
-      val input = mock[R]
-      flatMapper.process(input)
-      verify(flatMapFunction).apply(input)
-    }
-
-    "return passed in description" in {
-      flatMapper.description shouldBe "flatMap"
-    }
-
-    "return None on finish" in {
-      flatMapper.finish() shouldBe List.empty[S]
-    }
-
-    "set up FlatMapFunction on setup" in {
-      flatMapper.setup()
-
-      verify(flatMapFunction).setup()
-    }
-
-    "tear down FlatMapFunction on teardown" in {
-      flatMapper.teardown()
-
-      verify(flatMapFunction).teardown()
-    }
-  }
-
-  "ReduceFunction" should {
-
-    "call reduce function when processing input value" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      val input1 = mock[T]
-      val input2 = mock[T]
-      val output = mock[T]
-
-      when(reduceFunction.apply(input1, input2)).thenReturn(output, output)
-
-      reducer.process(input1) shouldBe List.empty[T]
-      reducer.process(input2) shouldBe List.empty[T]
-      reducer.finish() shouldBe List(output)
-
-      reducer.teardown()
-      reducer.process(input1) shouldBe List.empty[T]
-      reducer.teardown()
-      reducer.process(input2) shouldBe List.empty[T]
-      reducer.finish() shouldBe List(input2)
-    }
-
-    "return passed in description" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      reducer.description shouldBe "reduce"
-    }
-
-    "return None on finish" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      reducer.finish() shouldBe List.empty[T]
-    }
-
-    "set up reduce function on setup" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      reducer.setup()
-
-      verify(reduceFunction).setup()
-    }
-
-    "tear down reduce function on teardown" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      reducer.teardown()
-
-      verify(reduceFunction).teardown()
-    }
-  }
-
-  "Emit" should {
-
-    val emitFunction = mock[T => Unit]
-    val emit = new Emit[T](emitFunction)
-
-    "emit input value when processing input value" in {
-      val input = mock[T]
-
-      emit.process(input) shouldBe List.empty[Unit]
-
-      verify(emitFunction).apply(input)
-    }
-
-    "return empty description" in {
-      emit.description shouldBe ""
-    }
-
-    "return None on finish" in {
-      emit.finish() shouldBe List.empty[Unit]
-    }
-
-    "do nothing on setup" in {
-      emit.setup()
-
-      verifyZeroInteractions(emitFunction)
-    }
-
-    "do nothing on teardown" in {
-      emit.teardown()
-
-      verifyZeroInteractions(emitFunction)
-    }
-  }
-
-  "Source" should {
-    "iterate over input source and apply attached operator" in {
-
-      val taskContext = MockUtil.mockTaskContext
-      implicit val actorSystem = MockUtil.system
-
-      val data = "one two three".split("\\s+")
-      val dataSource = new CollectionDataSource[String](data)
-      val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, 
dataSource)
-
-      // Source with no transformer
-      val source = new DataSourceTask[String, String](
-        taskContext, conf)
-      source.onStart(Instant.EPOCH)
-      source.onNext(Message("next"))
-      data.foreach { s =>
-        verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
-          message => message.msg == s))
-      }
-
-      // Source with transformer
-      val anotherTaskContext = MockUtil.mockTaskContext
-      val double = new FlatMapper[String, String](FlatMapFunction(
-        word => List(word, word)), "double")
-      val another = new DataSourceTask(anotherTaskContext,
-        conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
-      another.onStart(Instant.EPOCH)
-      another.onNext(Message("next"))
-      data.foreach { s =>
-        verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
-          message => message.msg == s))
-      }
-    }
-  }
-
-  "CountTriggerTask" should {
-    "group input by groupBy Function and " +
-      "apply attached operator for each group" in {
-
-      val data = "1 2  2  3 3  3"
-
-      val concat = new Reducer[String](ReduceFunction({ (left, right) =>
-        left + right}), "concat")
-
-      implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-      val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](
-        GEARPUMP_STREAMING_OPERATOR, concat)
-
-      val taskContext = MockUtil.mockTaskContext
-
-      val groupBy = GroupAlsoByWindow((input: String) => input, 
CountWindow.apply(1).accumulating)
-      val task = new CountTriggerTask[String, String](groupBy, taskContext, 
config)
-      task.onStart(Instant.EPOCH)
-
-      val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
-
-      data.split("\\s+").foreach { word =>
-        task.onNext(Message(word))
-      }
-      verify(taskContext, times(6)).output(peopleCaptor.capture())
-
-      import scala.collection.JavaConverters._
-
-      val values = peopleCaptor.getAllValues.asScala.map(input => 
input.msg.asInstanceOf[String])
-      assert(values.mkString(",") == "1,2,22,3,33,333")
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-
-  "MergeTask" should {
-    "accept two stream and apply the attached operator" in {
-
-      // Source with transformer
-      val taskContext = MockUtil.mockTaskContext
-      val conf = UserConfig.empty
-      val double = new FlatMapper[String, String](FlatMapFunction(
-        word => List(word, word)), "double")
-      val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
-      task.onStart(Instant.EPOCH)
-
-      val data = "1 2  2  3 3  3".split("\\s+")
-
-      data.foreach { input =>
-        task.onNext(Message(input))
-      }
-
-      verify(taskContext, times(data.length * 2)).output(anyObject())
-    }
-  }
-}
-
-object SingleInputFunctionSpec {
-  type R = AnyRef
-  type S = AnyRef
-  type T = AnyRef
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
index 871d751..1a4958a 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
@@ -22,7 +22,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.api.CountWindows
 import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -42,7 +42,7 @@ class CountTriggerTaskSpec extends PropSpec with 
PropertyChecks
     forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) =>
 
       val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-      val window = CountWindow.apply(windowSize)
+      val window = CountWindows.apply[Any](windowSize)
       when(groupBy.window).thenReturn(window)
       val windowRunner = mock[WindowRunner]
       val userConfig = UserConfig.empty

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
index a69abe6..07b5544 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
@@ -43,7 +43,7 @@ class EventTimeTriggerTaskSpec extends PropSpec with 
PropertyChecks
     forAll(windowSizeGen, windowStepGen, watermarkGen) {
       (windowSize: Long, windowStep: Long, watermark: Instant) =>
 
-        val window = SlidingWindow.apply(Duration.ofMillis(windowSize),
+        val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize),
           Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
         val groupBy = mock[GroupAlsoByWindow[Any, Any]]
         val windowRunner = mock[WindowRunner]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
index 39e1b4c..ef51ab2 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
@@ -44,7 +44,7 @@ class ProcessingTimeTriggerTaskSpec extends PropSpec with 
PropertyChecks
     forAll(windowSizeGen, windowStepGen, startTimeGen) {
       (windowSize: Long, windowStep: Long, startTime: Instant) =>
 
-        val window = SlidingWindow.apply(Duration.ofMillis(windowSize),
+        val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize),
           Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
         val groupBy = mock[GroupAlsoByWindow[Any, Any]]
         val windowRunner = mock[WindowRunner]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index b6e7342..8266df5 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -22,7 +22,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
 import org.mockito.Mockito.{verify, when}
 import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
@@ -36,7 +36,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val config = UserConfig.empty
-      val operator = mock[SingleInputFunction[Any, Any]]
+      val operator = mock[FunctionRunner[Any, Any]]
       val sourceTask = new TransformTask[Any, Any](Some(operator), 
taskContext, config)
 
       sourceTask.onStart(startTime)
@@ -50,7 +50,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val config = UserConfig.empty
-      val operator = mock[SingleInputFunction[Any, Any]]
+      val operator = mock[FunctionRunner[Any, Any]]
       val task = new TransformTask[Any, Any](Some(operator), taskContext, 
config)
       val msg = Message(str)
       when(operator.process(str)).thenReturn(Some(str))
@@ -65,7 +65,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
     val taskContext = MockUtil.mockTaskContext
     implicit val system = MockUtil.system
     val config = UserConfig.empty
-    val operator = mock[SingleInputFunction[Any, Any]]
+    val operator = mock[FunctionRunner[Any, Any]]
     val task = new TransformTask[Any, Any](Some(operator), taskContext, config)
 
     task.onStop()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index 4e95bdd..f7f6fd9 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -39,7 +39,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-      val operator = mock[SingleInputFunction[Any, Any]]
+      val operator = mock[FunctionRunner[Any, Any]]
       val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, Some(operator))
 
       sourceTask.onStart(startTime)
@@ -72,7 +72,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
     val dataSource = mock[DataSource]
     val config = UserConfig.empty
       .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-    val operator = mock[SingleInputFunction[Any, Any]]
+    val operator = mock[FunctionRunner[Any, Any]]
     val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, Some(operator))
 
     sourceTask.onStop()

Reply via email to