Repository: incubator-gearpump Updated Branches: refs/heads/master 198366360 -> 2d13b9cf8
[GEARPUMP-367] Don't use windowing unnecessarily Author: manuzhang <[email protected]> Closes #226 from manuzhang/simplify_dsl. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/2d13b9cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/2d13b9cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/2d13b9cf Branch: refs/heads/master Commit: 2d13b9cf883ee59b97751d992c2b52dc4068b16c Parents: 1983663 Author: manuzhang <[email protected]> Authored: Tue Apr 10 22:32:12 2018 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 10 22:32:23 2018 +0800 ---------------------------------------------------------------------- .../apache/gearpump/streaming/dsl/package.scala | 2 +- .../apache/gearpump/streaming/dsl/plan/OP.scala | 83 +++++-- .../dsl/plan/functions/FunctionRunner.scala | 8 +- .../streaming/dsl/task/GroupByTask.scala | 29 ++- .../streaming/dsl/task/TransformTask.scala | 18 +- .../dsl/window/impl/StreamingOperator.scala | 246 +++++++++++++++++++ .../dsl/window/impl/WindowRunner.scala | 179 -------------- .../streaming/source/DataSourceProcessor.scala | 22 +- .../streaming/source/DataSourceTask.scala | 19 +- .../gearpump/streaming/task/TaskUtil.scala | 4 +- .../gearpump/streaming/dsl/plan/OpSpec.scala | 14 +- .../streaming/dsl/plan/PlannerSpec.scala | 4 +- .../dsl/plan/functions/FunctionRunnerSpec.scala | 10 +- .../streaming/dsl/scalaapi/StreamAppSpec.scala | 4 +- .../streaming/dsl/task/GroupByTaskSpec.scala | 4 +- .../streaming/dsl/task/TransformTaskSpec.scala | 6 +- .../window/impl/DefaultWindowRunnerSpec.scala | 4 +- .../streaming/source/DataSourceTaskSpec.scala | 18 +- 18 files changed, 391 insertions(+), 283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala index 6d43f16..2d3d94b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala @@ -36,7 +36,7 @@ package org.apache.gearpump.streaming * * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]] * * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops. * - * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally + * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.StreamingOperator]], which internally * runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with * [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually * executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]]. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index c37ced6..4a71b08 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -21,12 +21,11 @@ package org.apache.gearpump.streaming.dsl.plan import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FunctionRunner} -import org.apache.gearpump.streaming.dsl.window.impl.{AndThen => WindowRunnerAT} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FlatMapper, FunctionRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{AndThenOperator, FlatMapOperator, StreamingOperator, WindowOperator} import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask} import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows} -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} import org.apache.gearpump.streaming.task.Task @@ -69,6 +68,16 @@ object Op { } } + def isFlatMapper(runner: FunctionRunner[Any, Any]): Boolean = { + runner match { + case fm: FlatMapper[Any, Any] => + true + case at: AndThen[Any, Any, Any] => + isFlatMapper(at.first) && isFlatMapper(at.second) + case _ => + false + } + } } /** @@ -134,39 +143,59 @@ case class DataSourceOp( dataSource: DataSource, parallelism: Int = 1, description: String = "source", - userConfig: UserConfig = UserConfig.empty) + userConfig: UserConfig = UserConfig.empty, + operator: Option[StreamingOperator[Any, Any]] = None) extends Op { override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { - case op: WindowTransformOp[_, _] => + case op: WindowTransformOp[Any, Any] => + val chainedRunner = + operator.map(AndThenOperator(_, op.operator)).getOrElse(op.operator) DataSourceOp( dataSource, parallelism, Op.concatenate(description, op.description), Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, - op.windowRunner), - op.userConfig)) - case op: TransformOp[_, _] => - chain( - WindowOp(GlobalWindows()).chain(op)) + chainedRunner), + op.userConfig), + Some(chainedRunner)) + case op: TransformOp[Any, Any] => + val runner = op.runner + if (Op.isFlatMapper(runner)) { + val fm = new FlatMapOperator[Any, Any](runner) + val chainedRunner = + operator.map(AndThenOperator(_, fm)).getOrElse(fm) + DataSourceOp( + dataSource, + parallelism, + Op.concatenate(description, op.description), + Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, + chainedRunner), + op.userConfig), + Some(chainedRunner) + ) + } else { + chain( + WindowOp(GlobalWindows()).chain(op)) + } case op: WindowOp => chain( op.chain(TransformOp(new DummyRunner[Any]()))) case op: TransformWindowTransformOp[_, _, _] => - chain( - WindowOp(GlobalWindows()).chain(op.transformOp) - .chain(op.windowTransformOp)) + chain(op.transformOp).chain(op.windowTransformOp) case _ => throw new OpChainException(this, other) } } override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - Op.withGlobalWindowsDummyRunner(this, userConfig, + if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) { + chain(TransformOp(new DummyRunner[Any])).toProcessor + } else { Processor[DataSourceTask[Any, Any]](parallelism, description, userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource)) - ) + } } } @@ -195,10 +224,10 @@ case class DataSinkOp( * to another Op to be executed */ case class TransformOp[IN, OUT]( - fn: FunctionRunner[IN, OUT], + runner: FunctionRunner[IN, OUT], userConfig: UserConfig = UserConfig.empty) extends Op { - override def description: String = fn.description + override def description: String = runner.description override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { @@ -208,16 +237,16 @@ case class TransformOp[IN, OUT]( // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3)) // => AndThen(AndThen(f1, f2), f3) TransformOp( - AndThen(fn, op.fn), + AndThen(runner, op.runner), Op.concatenate(userConfig, op.userConfig)) case op: WindowOp => TransformWindowTransformOp(this, - WindowTransformOp(new DefaultWindowRunner[OUT, OUT]( + WindowTransformOp(new WindowOperator[OUT, OUT]( op.windows, new DummyRunner[OUT] ), op.description, op.userConfig)) case op: TransformWindowTransformOp[OUT, _, _] => TransformWindowTransformOp(TransformOp( - AndThen(fn, op.transformOp.fn), + AndThen(runner, op.transformOp.runner), Op.concatenate(userConfig, op.transformOp.userConfig) ), op.windowTransformOp) case _ => @@ -244,13 +273,13 @@ case class WindowOp( override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { case op: TransformOp[_, _] => - WindowTransformOp(new DefaultWindowRunner(windows, op.fn), + WindowTransformOp(new WindowOperator(windows, op.runner), Op.concatenate(description, op.description), Op.concatenate(userConfig, op.userConfig)) case op: WindowOp => chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any]))) case op: TransformWindowTransformOp[_, _, _] => - WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn), + WindowTransformOp(new WindowOperator(windows, op.transformOp.runner), Op.concatenate(description, op.transformOp.description), Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp) case _ => @@ -290,7 +319,7 @@ case class GroupByOp[IN, GROUP] private( Op.concatenate(description, op.description), Op.concatenate( userConfig - .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner), + .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.operator), userConfig)) case op: WindowOp => chain(op.chain(TransformOp(new DummyRunner[Any]()))) @@ -329,7 +358,7 @@ case class MergeOp( parallelism, description, Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, - op.windowRunner), + op.operator), op.userConfig)) case op: WindowOp => chain(op.chain(TransformOp(new DummyRunner[Any]()))) @@ -352,7 +381,7 @@ case class MergeOp( * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]]. */ private case class WindowTransformOp[IN, OUT]( - windowRunner: WindowRunner[IN, OUT], + operator: StreamingOperator[IN, OUT], description: String, userConfig: UserConfig) extends Op { @@ -360,7 +389,7 @@ private case class WindowTransformOp[IN, OUT]( other match { case op: WindowTransformOp[OUT, _] => WindowTransformOp( - WindowRunnerAT(windowRunner, op.windowRunner), + AndThenOperator(operator, op.operator), Op.concatenate(description, op.description), Op.concatenate(userConfig, op.userConfig) ) @@ -372,7 +401,7 @@ private case class WindowTransformOp[IN, OUT]( override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp Processor[TransformTask[Any, Any]](1, description, userConfig.withValue( - Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)) + Constants.GEARPUMP_STREAMING_OPERATOR, operator)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala index 2c11238..c638257 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala @@ -39,6 +39,7 @@ sealed trait FunctionRunner[IN, OUT] extends java.io.Serializable { def description: String } + case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE], second: FunctionRunner[MIDDLE, OUT]) extends FunctionRunner[IN, OUT] { @@ -114,10 +115,5 @@ class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String) } } -class DummyRunner[T] extends FunctionRunner[T, T] { - - override def process(value: T): TraversableOnce[T] = Option(value) - - override def description: String = "" -} +class DummyRunner[T] extends FlatMapper[T, T](FlatMapFunction(Option(_)), "") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala index b3f3ad2..b615354 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala @@ -25,7 +25,7 @@ import com.gs.collections.impl.map.mutable.UnifiedMap import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR} -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator} import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} @@ -44,20 +44,21 @@ class GroupByTask[IN, GROUP, OUT]( ) } - private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] = - new UnifiedMap[GROUP, WindowRunner[IN, OUT]] + private val groups: UnifiedMap[GROUP, StreamingOperator[IN, OUT]] = + new UnifiedMap[GROUP, StreamingOperator[IN, OUT]] override def onNext(message: Message): Unit = { val input = message.value.asInstanceOf[IN] val group = groupBy(input) if (!groups.containsKey(group)) { - groups.put(group, - userConfig.getValue[WindowRunner[IN, OUT]]( - GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get) + val operator = userConfig.getValue[StreamingOperator[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get + operator.setup() + groups.put(group, operator) } - groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN], + groups.get(group).foreach(TimestampedValue(message.value.asInstanceOf[IN], message.timestamp)) } @@ -65,11 +66,19 @@ class GroupByTask[IN, GROUP, OUT]( if (groups.isEmpty && watermark == Watermark.MAX) { taskContext.updateWatermark(Watermark.MAX) } else { - groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] { - override def accept(runner: WindowRunner[IN, OUT]): Unit = { - TaskUtil.trigger(watermark, runner, taskContext) + groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] { + override def accept(operator: StreamingOperator[IN, OUT]): Unit = { + TaskUtil.trigger(watermark, operator, taskContext) } }) } } + + override def onStop(): Unit = { + groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] { + override def accept(operator: StreamingOperator[IN, OUT]): Unit = { + operator.teardown() + } + }) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index 5ad64fa..6c78e0b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -22,25 +22,33 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{StreamingOperator, TimestampedValue} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} class TransformTask[IN, OUT]( - runner: WindowRunner[IN, OUT], + operator: StreamingOperator[IN, OUT], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(context: TaskContext, conf: UserConfig) = { this( - conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + conf.getValue[StreamingOperator[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, context, conf ) } + override def onStart(startTime: Instant): Unit = { + operator.setup() + } + override def onNext(msg: Message): Unit = { - runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)) + operator.foreach(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)) } override def onWatermarkProgress(watermark: Instant): Unit = { - TaskUtil.trigger(watermark, runner, taskContext) + TaskUtil.trigger(watermark, operator, taskContext) + } + + override def onStop(): Unit = { + operator.teardown() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala new file mode 100644 index 0000000..4f29c9e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import java.time.Instant + +import com.gs.collections.api.block.predicate.Predicate +import com.gs.collections.api.block.procedure.Procedure +import com.gs.collections.impl.list.mutable.FastList +import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows} +import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.task.TaskUtil + +import scala.collection.mutable.ArrayBuffer + +/** + * Inputs for [[StreamingOperator]]. + */ +case class TimestampedValue[T](value: T, timestamp: Instant) { + + def this(msg: Message) = { + this(msg.value.asInstanceOf[T], msg.timestamp) + } + + def toMessage: Message = Message(value, timestamp) +} + +/** + * Outputs triggered by [[StreamingOperator]] + */ +case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]], + watermark: Instant) + + +trait StreamingOperator[IN, OUT] extends java.io.Serializable { + + def setup(): Unit = {} + + def foreach(tv: TimestampedValue[IN]): Unit + + def flatMap( + tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { + foreach(tv) + None + } + + def trigger(time: Instant): TriggeredOutputs[OUT] + + def teardown(): Unit = {} +} + +/** + * A composite WindowRunner that first executes its left child and feeds results + * into result child. + */ +case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, MIDDLE], + right: StreamingOperator[MIDDLE, OUT]) extends StreamingOperator[IN, OUT] { + + override def setup(): Unit = { + left.setup() + right.setup() + } + + override def foreach( + tv: TimestampedValue[IN]): Unit = { + left.flatMap(tv).foreach(right.flatMap) + } + + override def flatMap( + tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { + left.flatMap(tv).flatMap(right.flatMap) + } + + override def trigger(time: Instant): TriggeredOutputs[OUT] = { + val lOutputs = left.trigger(time) + lOutputs.outputs.foreach(right.foreach) + right.trigger(lOutputs.watermark) + } + + override def teardown(): Unit = { + left.teardown() + right.teardown() + } +} + +/** + * @param runner FlatMapper or chained FlatMappers + */ +class FlatMapOperator[IN, OUT](runner: FunctionRunner[IN, OUT]) + extends StreamingOperator[IN, OUT] { + + override def setup(): Unit = { + runner.setup() + } + + override def foreach(tv: TimestampedValue[IN]): Unit = { + throw new UnsupportedOperationException("foreach should not be invoked on FlatMapOperator; " + + "please use flatMap instead") + } + + override def flatMap( + tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { + runner.process(tv.value) + .map(TimestampedValue(_, tv.timestamp)) + } + + override def trigger(time: Instant): TriggeredOutputs[OUT] = { + TriggeredOutputs(None, time) + } + + override def teardown(): Unit = { + runner.teardown() + } +} + +/** + * This is responsible for executing window calculation. + * 1. Groups elements into windows as defined by window function + * 2. Applies window calculation to each group + * 3. Emits results on triggering + */ +class WindowOperator[IN, OUT]( + windows: Windows, + runner: FunctionRunner[IN, OUT]) + extends StreamingOperator[IN, OUT] { + + private val windowFn = windows.windowFn + private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]] + private var isSetup = false + private var watermark = Watermark.MIN + + override def foreach( + tv: TimestampedValue[IN]): Unit = { + val wins = windowFn(new Context[IN] { + override def element: IN = tv.value + + override def timestamp: Instant = tv.timestamp + }) + wins.foreach { win => + if (windowFn.isNonMerging) { + if (!windowInputs.containsKey(win)) { + val inputs = new FastList[TimestampedValue[IN]] + windowInputs.put(win, inputs) + } + windowInputs.get(win).add(tv) + } else { + merge(windowInputs, win, tv) + } + } + + def merge( + winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]], + win: Window, tv: TimestampedValue[IN]): Unit = { + val intersected = winIns.keySet.select(new Predicate[Window] { + override def accept(each: Window): Boolean = { + win.intersects(each) + } + }) + var mergedWin = win + val mergedInputs = FastList.newListWith(tv) + intersected.forEach(new Procedure[Window] { + override def value(each: Window): Unit = { + mergedWin = mergedWin.span(each) + mergedInputs.addAll(winIns.remove(each)) + } + }) + winIns.put(mergedWin, mergedInputs) + } + } + + override def trigger(time: Instant): TriggeredOutputs[OUT] = { + @annotation.tailrec + def onTrigger( + outputs: ArrayBuffer[TimestampedValue[OUT]], + wmk: Instant): TriggeredOutputs[OUT] = { + if (windowInputs.notEmpty()) { + val firstWin = windowInputs.firstKey + if (!time.isBefore(firstWin.endTime)) { + val inputs = windowInputs.remove(firstWin) + if (!isSetup) { + runner.setup() + isSetup = true + } + inputs.forEach(new Procedure[TimestampedValue[IN]] { + override def value(tv: TimestampedValue[IN]): Unit = { + runner.process(tv.value).foreach { + out: OUT => outputs += TimestampedValue(out, tv.timestamp) + } + } + }) + runner.finish().foreach { + out: OUT => + outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) + } + val newWmk = TaskUtil.max(wmk, firstWin.endTime) + if (windows.accumulationMode == Discarding) { + runner.teardown() + // discarding, setup need to be called for each window + isSetup = false + } + onTrigger(outputs, newWmk) + } else { + // The output watermark is the minimum of end of last triggered window + // and start of first un-triggered window + TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime)) + } + } else { + // All windows have been triggered. + if (time == Watermark.MAX) { + // This means there will be no more inputs + // so it's safe to advance to the maximum watermark. + TriggeredOutputs(outputs, Watermark.MAX) + } else { + TriggeredOutputs(outputs, wmk) + } + } + } + + val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], watermark) + watermark = TaskUtil.max(watermark, triggeredOutputs.watermark) + TriggeredOutputs(triggeredOutputs.outputs, watermark) + } + + override def teardown(): Unit = { + runner.teardown() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala deleted file mode 100644 index ee3c067..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.window.impl - -import java.time.Instant - -import com.gs.collections.api.block.predicate.Predicate -import com.gs.collections.api.block.procedure.Procedure -import com.gs.collections.impl.list.mutable.FastList -import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap -import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner -import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context -import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows} -import org.apache.gearpump.streaming.source.Watermark -import org.apache.gearpump.streaming.task.TaskUtil - -import scala.collection.mutable.ArrayBuffer - -/** - * Inputs for [[WindowRunner]]. - */ -case class TimestampedValue[T](value: T, timestamp: Instant) - -/** - * Outputs triggered by [[WindowRunner]] - */ -case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]], - watermark: Instant) - -/** - * This is responsible for executing window calculation. - * 1. Groups elements into windows as defined by window function - * 2. Applies window calculation to each group - * 3. Emits results on triggering - */ -trait WindowRunner[IN, OUT] extends java.io.Serializable { - - def process(timestampedValue: TimestampedValue[IN]): Unit - - def trigger(time: Instant): TriggeredOutputs[OUT] -} - -/** - * A composite WindowRunner that first executes its left child and feeds results - * into result child. - */ -case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], - right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] { - - override def process(timestampedValue: TimestampedValue[IN]): Unit = { - left.process(timestampedValue) - } - - override def trigger(time: Instant): TriggeredOutputs[OUT] = { - val lOutputs = left.trigger(time) - lOutputs.outputs.foreach(right.process) - right.trigger(lOutputs.watermark) - } -} - -/** - * Default implementation for [[WindowRunner]]. - */ -class DefaultWindowRunner[IN, OUT]( - windows: Windows, - fnRunner: FunctionRunner[IN, OUT]) - extends WindowRunner[IN, OUT] { - - private val windowFn = windows.windowFn - private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]] - private var setup = false - private var watermark = Watermark.MIN - - override def process(timestampedValue: TimestampedValue[IN]): Unit = { - val wins = windowFn(new Context[IN] { - override def element: IN = timestampedValue.value - - override def timestamp: Instant = timestampedValue.timestamp - }) - wins.foreach { win => - if (windowFn.isNonMerging) { - if (!windowInputs.containsKey(win)) { - val inputs = new FastList[TimestampedValue[IN]] - windowInputs.put(win, inputs) - } - windowInputs.get(win).add(timestampedValue) - } else { - merge(windowInputs, win, timestampedValue) - } - } - - def merge( - winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]], - win: Window, tv: TimestampedValue[IN]): Unit = { - val intersected = winIns.keySet.select(new Predicate[Window] { - override def accept(each: Window): Boolean = { - win.intersects(each) - } - }) - var mergedWin = win - val mergedInputs = FastList.newListWith(tv) - intersected.forEach(new Procedure[Window] { - override def value(each: Window): Unit = { - mergedWin = mergedWin.span(each) - mergedInputs.addAll(winIns.remove(each)) - } - }) - winIns.put(mergedWin, mergedInputs) - } - } - - override def trigger(time: Instant): TriggeredOutputs[OUT] = { - @annotation.tailrec - def onTrigger( - outputs: ArrayBuffer[TimestampedValue[OUT]], - wmk: Instant): TriggeredOutputs[OUT] = { - if (windowInputs.notEmpty()) { - val firstWin = windowInputs.firstKey - if (!time.isBefore(firstWin.endTime)) { - val inputs = windowInputs.remove(firstWin) - if (!setup) { - fnRunner.setup() - setup = true - } - inputs.forEach(new Procedure[TimestampedValue[IN]] { - override def value(tv: TimestampedValue[IN]): Unit = { - fnRunner.process(tv.value).foreach { - out: OUT => outputs += TimestampedValue(out, tv.timestamp) - } - } - }) - fnRunner.finish().foreach { - out: OUT => - outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) - } - val newWmk = TaskUtil.max(wmk, firstWin.endTime) - if (windows.accumulationMode == Discarding) { - fnRunner.teardown() - // discarding, setup need to be called for each window - setup = false - } - onTrigger(outputs, newWmk) - } else { - // The output watermark is the minimum of end of last triggered window - // and start of first un-triggered window - TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime)) - } - } else { - // All windows have been triggered. - if (time == Watermark.MAX) { - // This means there will be no more inputs - // so it's safe to advance to the maximum watermark. - TriggeredOutputs(outputs, Watermark.MAX) - } else { - TriggeredOutputs(outputs, wmk) - } - } - } - - val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], watermark) - watermark = TaskUtil.max(watermark, triggeredOutputs.watermark) - TriggeredOutputs(triggeredOutputs.outputs, watermark) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala index dd4c0d3..c471a00 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala @@ -18,11 +18,11 @@ package org.apache.gearpump.streaming.source + import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner -import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows} -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, Window, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{FlatMapOperator, StreamingOperator} import org.apache.gearpump.streaming.{Constants, Processor} /** @@ -48,19 +48,9 @@ object DataSourceProcessor { Processor[DataSourceTask[Any, Any]](parallelism, description, taskConf .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource) - .withValue[WindowRunner[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR, - new DefaultWindowRunner[Any, Any]( - Windows(PerElementWindowFunction, description = "perElementWindows"), - new DummyRunner[Any]))) - } - - - case object PerElementWindowFunction extends WindowFunction { - override def apply[T]( - context: WindowFunction.Context[T]): Array[Window] = { - Array(Window(context.timestamp, context.timestamp.plusMillis(1))) - } - - override def isNonMerging: Boolean = true + .withValue[StreamingOperator[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR, + new FlatMapOperator(new DummyRunner[Any]) + ) + ) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index f93c496..b09ad66 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} /** @@ -40,7 +40,7 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} */ class DataSourceTask[IN, OUT] private[source]( source: DataSource, - windowRunner: WindowRunner[IN, OUT], + operator: StreamingOperator[IN, OUT], context: TaskContext, conf: UserConfig) extends Task(context, conf) { @@ -48,7 +48,7 @@ class DataSourceTask[IN, OUT] private[source]( def this(context: TaskContext, conf: UserConfig) = { this( conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get, - conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + conf.getValue[StreamingOperator[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, context, conf ) } @@ -58,27 +58,32 @@ class DataSourceTask[IN, OUT] private[source]( override def onStart(startTime: Instant): Unit = { LOG.info(s"opening data source at ${startTime.toEpochMilli}") source.open(context, startTime) + operator.setup() self ! Watermark(source.getWatermark) } override def onNext(m: Message): Unit = { 0.until(batchSize).foreach { _ => - Option(source.read()).foreach( - msg => windowRunner.process( - TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))) + Option(source.read()).foreach(process) } self ! Watermark(source.getWatermark) } override def onWatermarkProgress(watermark: Instant): Unit = { - TaskUtil.trigger(watermark, windowRunner, context) + TaskUtil.trigger(watermark, operator, context) } override def onStop(): Unit = { LOG.info("closing data source...") source.close() + operator.teardown() + } + + private def process(msg: Message): Unit = { + operator.flatMap(new TimestampedValue(msg)) + .foreach { tv => context.output(tv.toMessage) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala index bd889c4..ed304ce 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.task import java.time.Instant import org.apache.gearpump.Message -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator} object TaskUtil { @@ -36,7 +36,7 @@ object TaskUtil { loader.loadClass(className).asSubclass(classOf[Task]) } - def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT], + def trigger[IN, OUT](watermark: Instant, runner: StreamingOperator[IN, OUT], context: TaskContext): Unit = { val triggeredOutputs = runner.trigger(watermark) context.updateWatermark(triggeredOutputs.watermark) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index ca0135d..79ef135 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -28,7 +28,7 @@ import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTas import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, FlatMapper, FunctionRunner} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, StreamingOperator} import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.{Task, TaskContext} @@ -66,7 +66,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val dataSourceOp = DataSourceOp(dataSource) val transformOp = mock[TransformOp[Any, Any]] val fn = mock[FunctionRunner[Any, Any]] - when(transformOp.fn).thenReturn(fn) + when(transformOp.runner).thenReturn(fn) val chainedOp = dataSourceOp.chain(transformOp) @@ -85,7 +85,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val processor = dataSourceOp.toProcessor processor shouldBe a[Processor[_]] processor.parallelism shouldBe dataSourceOp.parallelism - processor.description shouldBe s"${dataSourceOp.description}.globalWindows" + processor.description shouldBe s"${dataSourceOp.description}" } } @@ -173,9 +173,9 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "chain WindowTransformOp" in { - val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner()) + val runner = new WindowOperator[Any, Any](GlobalWindows(), new DummyRunner()) val windowTransformOp = mock[WindowTransformOp[Any, Any]] - when(windowTransformOp.windowRunner).thenReturn(runner) + when(windowTransformOp.operator).thenReturn(runner) val chainedOp = groupByOp.chain(windowTransformOp) chainedOp shouldBe a[GroupByOp[_, _]] @@ -199,9 +199,9 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val mergeOp = MergeOp() "chain WindowTransformOp" in { - val runner = mock[WindowRunner[Any, Any]] + val runner = mock[StreamingOperator[Any, Any]] val windowTransformOp = mock[WindowTransformOp[Any, Any]] - when(windowTransformOp.windowRunner).thenReturn(runner) + when(windowTransformOp.operator).thenReturn(runner) val chainedOp = mergeOp.chain(windowTransformOp) chainedOp shouldBe a [MergeOp] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index be4cc63..f2c9d0e 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -87,8 +87,8 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc .mapVertex(_.description) plan.getVertices.toSet should contain theSameElementsAs - Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink") - plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe + Set("source", "groupBy.globalWindows.flatMap.reduce", "processor", "sink") + plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]] plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 shouldBe a[CoLocationPartitioner] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala index 6244224..c92f9c8 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala @@ -29,7 +29,7 @@ import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.task.TransformTask import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, StreamingOperator} import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} @@ -218,11 +218,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val data = "one two three".split("\\s+") val dataSource = new CollectionDataSource[String](data) - val runner1 = new DefaultWindowRunner[String, String]( + val runner1 = new WindowOperator[String, String]( GlobalWindows(), new DummyRunner[String]) val conf = UserConfig.empty .withValue(GEARPUMP_STREAMING_SOURCE, dataSource) - .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1) + .withValue[StreamingOperator[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1) // Source with no transformer val source = new DataSourceTask[String, String]( @@ -239,7 +239,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val anotherTaskContext = MockUtil.mockTaskContext val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") - val runner2 = new DefaultWindowRunner[String, String]( + val runner2 = new WindowOperator[String, String]( GlobalWindows(), double) val another = new DataSourceTask(anotherTaskContext, conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2)) @@ -262,7 +262,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val conf = UserConfig.empty val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") - val transform = new DefaultWindowRunner[String, String](GlobalWindows(), double) + val transform = new WindowOperator[String, String](GlobalWindows(), double) val task = new TransformTask[String, String](transform, taskContext, conf) task.onStart(Instant.EPOCH) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala index d43bca0..f701c5e 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala @@ -61,9 +61,9 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M dag.getVertices.size shouldBe 2 dag.getVertices.foreach { processor => processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName - if (processor.description == "A.globalWindows") { + if (processor.description == "A") { processor.parallelism shouldBe 2 - } else if (processor.description == "B.globalWindows") { + } else if (processor.description == "B") { processor.parallelism shouldBe 3 } else { fail(s"undefined source ${processor.description}") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala index 9e6bf59..62e14f4 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows import org.apache.gearpump.streaming.{Constants, MockUtil} -import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner +import org.apache.gearpump.streaming.dsl.window.impl.WindowOperator import org.apache.gearpump.streaming.source.Watermark import org.mockito.Mockito._ import org.scalacheck.Gen @@ -40,7 +40,7 @@ class GroupByTaskSpec extends PropSpec with PropertyChecks forAll(longGen) { (time: Instant) => val groupBy = mock[Any => Int] - val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner[Any]) + val windowRunner = new WindowOperator[Any, Any](GlobalWindows(), new DummyRunner[Any]) val context = MockUtil.mockTaskContext val config = UserConfig.empty .withValue( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index e38c5a3..0009ad5 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -22,7 +22,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, StreamingOperator} import org.mockito.Mockito.{verify, when} import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} @@ -36,7 +36,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with val watermarkGen = longGen.map(Instant.ofEpochMilli) forAll(watermarkGen) { (watermark: Instant) => - val windowRunner = mock[WindowRunner[Any, Any]] + val windowRunner = mock[StreamingOperator[Any, Any]] val context = MockUtil.mockTaskContext val config = UserConfig.empty val task = new TransformTask[Any, Any](windowRunner, context, config) @@ -45,7 +45,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with val message = Message(value, time) task.onNext(message) - verify(windowRunner).process(TimestampedValue(value, time)) + verify(windowRunner).foreach(TimestampedValue(value, time)) when(windowRunner.trigger(watermark)).thenReturn( TriggeredOutputs(Some(TimestampedValue(value, time)), watermark)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala index b23d0ee..1ac7213 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala @@ -46,10 +46,10 @@ class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks implicit val system = MockUtil.system val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2)) val windows = SessionWindows.apply(Duration.ofMillis(4L)) - val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows, + val windowRunner = new WindowOperator[KV, Option[KV]](windows, new FoldRunner[KV, Option[KV]](reduce, "reduce")) - data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp))) + data.foreach(m => windowRunner.foreach(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp))) windowRunner.trigger(Watermark.MAX).outputs.toList shouldBe List( TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index d62739a..cd2cfa7 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, StreamingOperator} import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -40,7 +40,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[WindowRunner[Any, Any]] + val runner = mock[StreamingOperator[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) sourceTask.onStart(startTime) @@ -57,13 +57,17 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[WindowRunner[Any, Any]] - val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) + val processor = mock[StreamingOperator[String, String]] + val sourceTask = new DataSourceTask[String, String](dataSource, processor, + taskContext, config) val msg = Message(str, timestamp) when(dataSource.read()).thenReturn(msg) - when(runner.trigger(Watermark.MAX)).thenReturn( - TriggeredOutputs(Some(TimestampedValue(str.asInstanceOf[Any], timestamp)), Watermark.MAX)) + when(processor.flatMap(new TimestampedValue[String](msg))).thenReturn( + Some(new TimestampedValue[String](msg)) + ) + when(processor.trigger(Watermark.MAX)).thenReturn( + TriggeredOutputs[String](None, Watermark.MAX)) sourceTask.onNext(Message("next")) sourceTask.onWatermarkProgress(Watermark.MAX) @@ -79,7 +83,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[WindowRunner[Any, Any]] + val runner = mock[StreamingOperator[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) sourceTask.onStop()
