Repository: incubator-gearpump
Updated Branches:
  refs/heads/master d78683328 -> aebc09a8d


[GEARPUMP-290] Setup FunctionRunner properly

Author: manuzhang <[email protected]>

Closes #168 from manuzhang/GEARPUMP-290.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/aebc09a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/aebc09a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/aebc09a8

Branch: refs/heads/master
Commit: aebc09a8deba2c68a0664f0035d9e68e5c4219a6
Parents: d786833
Author: manuzhang <[email protected]>
Authored: Tue Mar 7 19:24:01 2017 +0800
Committer: huafengw <[email protected]>
Committed: Tue Mar 7 19:24:53 2017 +0800

----------------------------------------------------------------------
 .../dsl/window/impl/WindowRunner.scala          | 27 ++++++++++++++------
 1 file changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/aebc09a8/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 44d724d..42d50e2 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -55,6 +55,7 @@ class DefaultWindowRunner[IN, GROUP, OUT](
   private val windowFn = groupBy.window.windowFn
   private val groupedWindowInputs = new UnifiedMap[GROUP, 
TreeSortedMap[Window, FastList[IN]]]
   private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
+  private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
 
   override def process(message: Message): Unit = {
     val input = message.msg.asInstanceOf[IN]
@@ -76,9 +77,9 @@ class DefaultWindowRunner[IN, GROUP, OUT](
     }
 
     if (!groupedFnRunners.containsKey(group)) {
-      val fn = userConfig.getValue[FunctionRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
-      fn.setup()
-      groupedFnRunners.put(group, fn)
+      val runner = userConfig.getValue[FunctionRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
+      groupedFnRunners.put(group, runner)
+      groupedRunnerSetups.put(group, false)
     }
 
     def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, 
input: IN): Unit = {
@@ -114,20 +115,30 @@ class DefaultWindowRunner[IN, GROUP, OUT](
         if (!time.isBefore(firstWin.endTime)) {
           val inputs = windowInputs.remove(firstWin)
           if (groupedFnRunners.containsKey(group)) {
-            val reduceFn = 
FunctionRunner.withEmitFn(groupedFnRunners.get(group),
+            val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
               (output: OUT) => taskContext.output(Message(output, time)))
+            val setup = groupedRunnerSetups.get(group)
+            if (!setup) {
+              runner.setup()
+              groupedRunnerSetups.put(group, true)
+            }
             inputs.forEach(new Procedure[IN] {
               override def value(t: IN): Unit = {
                 // .toList forces eager evaluation
-                reduceFn.process(t).toList
+                runner.process(t).toList
               }
             })
             // .toList forces eager evaluation
-            reduceFn.finish().toList
+            runner.finish().toList
             if (groupBy.window.accumulationMode == Discarding) {
-              reduceFn.teardown()
+              runner.teardown()
+              groupedRunnerSetups.put(group, false)
+              // dicarding, setup need to be called for each window
+              onTrigger(group, windowInputs)
+            } else {
+              // accumulating, setup is only called for the first window
+              onTrigger(group, windowInputs)
             }
-            onTrigger(group, windowInputs)
           } else {
             throw new RuntimeException(s"FunctionRunner not found for group 
$group")
           }

Reply via email to