Repository: incubator-gearpump Updated Branches: refs/heads/master d78683328 -> aebc09a8d
[GEARPUMP-290] Setup FunctionRunner properly Author: manuzhang <[email protected]> Closes #168 from manuzhang/GEARPUMP-290. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/aebc09a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/aebc09a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/aebc09a8 Branch: refs/heads/master Commit: aebc09a8deba2c68a0664f0035d9e68e5c4219a6 Parents: d786833 Author: manuzhang <[email protected]> Authored: Tue Mar 7 19:24:01 2017 +0800 Committer: huafengw <[email protected]> Committed: Tue Mar 7 19:24:53 2017 +0800 ---------------------------------------------------------------------- .../dsl/window/impl/WindowRunner.scala | 27 ++++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/aebc09a8/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 44d724d..42d50e2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -55,6 +55,7 @@ class DefaultWindowRunner[IN, GROUP, OUT]( private val windowFn = groupBy.window.windowFn private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]] private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]] + private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean] override def process(message: Message): Unit = { val input = message.msg.asInstanceOf[IN] @@ -76,9 +77,9 @@ class DefaultWindowRunner[IN, GROUP, OUT]( } if (!groupedFnRunners.containsKey(group)) { - val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - fn.setup() - groupedFnRunners.put(group, fn) + val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get + groupedFnRunners.put(group, runner) + groupedRunnerSetups.put(group, false) } def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = { @@ -114,20 +115,30 @@ class DefaultWindowRunner[IN, GROUP, OUT]( if (!time.isBefore(firstWin.endTime)) { val inputs = windowInputs.remove(firstWin) if (groupedFnRunners.containsKey(group)) { - val reduceFn = FunctionRunner.withEmitFn(groupedFnRunners.get(group), + val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group), (output: OUT) => taskContext.output(Message(output, time))) + val setup = groupedRunnerSetups.get(group) + if (!setup) { + runner.setup() + groupedRunnerSetups.put(group, true) + } inputs.forEach(new Procedure[IN] { override def value(t: IN): Unit = { // .toList forces eager evaluation - reduceFn.process(t).toList + runner.process(t).toList } }) // .toList forces eager evaluation - reduceFn.finish().toList + runner.finish().toList if (groupBy.window.accumulationMode == Discarding) { - reduceFn.teardown() + runner.teardown() + groupedRunnerSetups.put(group, false) + // dicarding, setup need to be called for each window + onTrigger(group, windowInputs) + } else { + // accumulating, setup is only called for the first window + onTrigger(group, windowInputs) } - onTrigger(group, windowInputs) } else { throw new RuntimeException(s"FunctionRunner not found for group $group") }
