Repository: incubator-gearpump Updated Branches: refs/heads/master 96312a2ac -> 5d524918d
[GEARPUMP-23] Refactor Window DSL Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the commit message is formatted like: `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` - [x] Make sure tests pass via `sbt clean test`. - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. Author: manuzhang <[email protected]> Closes #138 from manuzhang/window_dsl. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5d524918 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5d524918 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5d524918 Branch: refs/heads/master Commit: 5d524918d1e42d3bdd67ecf1e76b1c8c64f26572 Parents: 96312a2 Author: manuzhang <[email protected]> Authored: Mon Feb 6 21:00:41 2017 +0800 Committer: manuzhang <[email protected]> Committed: Mon Feb 6 21:00:41 2017 +0800 ---------------------------------------------------------------------- .../wordcount/dsl/WindowedWordCount.scala | 4 +- .../materializer/RemoteMaterializerImpl.scala | 10 +- .../streaming/dsl/javaapi/JavaStream.scala | 4 +- .../apache/gearpump/streaming/dsl/plan/OP.scala | 4 +- .../dsl/plan/functions/FunctionRunner.scala | 128 +++++++ .../plan/functions/SingleInputFunction.scala | 128 ------- .../streaming/dsl/scalaapi/Stream.scala | 14 +- .../streaming/dsl/task/CountTriggerTask.scala | 4 +- .../dsl/task/ProcessingTimeTriggerTask.scala | 4 +- .../streaming/dsl/task/TransformTask.scala | 6 +- .../streaming/dsl/window/api/Window.scala | 77 ----- .../streaming/dsl/window/api/WindowFn.scala | 63 ---- .../dsl/window/api/WindowFunction.scala | 72 ++++ .../streaming/dsl/window/api/Windows.scala | 77 +++++ .../streaming/dsl/window/impl/Window.scala | 42 ++- .../dsl/window/impl/WindowRunner.scala | 80 ++--- .../streaming/source/DataSourceTask.scala | 6 +- .../partitioner/GroupByPartitionerSpec.scala | 11 +- .../gearpump/streaming/dsl/plan/OpSpec.scala | 12 +- .../dsl/plan/functions/FunctionRunnerSpec.scala | 340 +++++++++++++++++++ .../functions/SingleInputFunctionSpec.scala | 339 ------------------ .../dsl/task/CountTriggerTaskSpec.scala | 4 +- .../dsl/task/EventTimeTriggerTaskSpec.scala | 2 +- .../task/ProcessingTimeTriggerTaskSpec.scala | 2 +- .../streaming/dsl/task/TransformTaskSpec.scala | 8 +- .../streaming/source/DataSourceTaskSpec.scala | 6 +- 26 files changed, 736 insertions(+), 711 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala index 401eac0..e1aac4c 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -23,7 +23,7 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp} -import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow} +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindows} import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.AkkaApp @@ -39,7 +39,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { // word => (word, count) flatMap(line => line.split("[\\s]+")).map((_, 1)). // fix window - window(FixedWindow.apply(Duration.ofMillis(5L)) + window(FixedWindows.apply(Duration.ofMillis(5L)) .triggering(EventTimeTrigger)). // (word, count1), (word, count2) => (word, count1 + count2) groupBy(_._1). http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala index e065c90..74fe077 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala @@ -38,7 +38,7 @@ import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle} import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.api.CountWindows import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.{ProcessorId, StreamApplication} import org.apache.gearpump.util.Graph @@ -163,8 +163,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { ProcessorOp(processor.processor, parallelism, updatedConf, "source") case sinkBridge: SinkBridgeModule[_, _] => ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink") - case groupBy: GroupByModule[_, _] => - GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindow.apply(1).accumulating), + case groupBy: GroupByModule[Any, Any] => + GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindows.apply[Any](1).accumulating), parallelism, "groupBy", conf) case reduce: ReduceModule[_] => reduceOp(reduce.f, conf) @@ -241,8 +241,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { val foldConf = conf.withValue(FoldTask.ZERO, fold.zero.asInstanceOf[AnyRef]). withValue(FoldTask.AGGREGATOR, fold.f) ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold") - case groupBy: GroupBy[_, _] => - GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindow.apply(1).accumulating), + case groupBy: GroupBy[Any, Any] => + GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindows.apply[Any](1).accumulating), groupBy.maxSubstreams, "groupBy", conf) case groupedWithin: GroupedWithin[_] => val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d). http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index 7f3c250..592c4dc 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -22,7 +22,7 @@ import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunct import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream} -import org.apache.gearpump.streaming.dsl.window.api.Window +import org.apache.gearpump.streaming.dsl.window.api.Windows import org.apache.gearpump.streaming.task.Task /** @@ -68,7 +68,7 @@ class JavaStream[T](val stream: Stream[T]) { new JavaStream[T](stream.groupBy(fn.apply, parallelism, description)) } - def window(win: Window, description: String): JavaWindowStream[T] = { + def window(win: Windows[T], description: String): JavaWindowStream[T] = { new JavaWindowStream[T](stream.window(win, description)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 5aaf2fa..56f16e1 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -22,7 +22,7 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, FunctionRunner} import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.TransformTask import org.apache.gearpump.streaming.dsl.window.api.GroupByFn @@ -124,7 +124,7 @@ case class DataSinkOp( * to another Op to be used */ case class ChainableOp[IN, OUT]( - fn: SingleInputFunction[IN, OUT], + fn: FunctionRunner[IN, OUT], userConfig: UserConfig = UserConfig.empty) extends Op { override def description: String = fn.description http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala new file mode 100644 index 0000000..36821e4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan.functions + +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction + +/** + * Interface to invoke SerializableFunction methods + * + * @param IN input value type + * @param OUT output value type + */ +sealed trait FunctionRunner[IN, OUT] extends java.io.Serializable { + + def setup(): Unit = {} + + def process(value: IN): TraversableOnce[OUT] + + def finish(): TraversableOnce[OUT] = None + + def teardown(): Unit = {} + + def description: String +} + +case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE], + second: FunctionRunner[MIDDLE, OUT]) + extends FunctionRunner[IN, OUT] { + + override def setup(): Unit = { + first.setup() + second.setup() + } + + override def process(value: IN): TraversableOnce[OUT] = { + first.process(value).flatMap(second.process) + } + + override def finish(): TraversableOnce[OUT] = { + val firstResult = first.finish().flatMap(second.process) + if (firstResult.isEmpty) { + second.finish() + } else { + firstResult + } + } + + override def teardown(): Unit = { + first.teardown() + second.teardown() + } + + override def description: String = { + Option(first.description).flatMap { description => + Option(second.description).map(description + "." + _) + }.orNull + } +} + +class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String) + extends FunctionRunner[IN, OUT] { + + override def setup(): Unit = { + fn.setup() + } + + override def process(value: IN): TraversableOnce[OUT] = { + fn(value) + } + + override def teardown(): Unit = { + fn.teardown() + } +} + +class Reducer[T](fn: ReduceFunction[T], val description: String) + extends FunctionRunner[T, T] { + + private var state: Option[T] = None + + override def setup(): Unit = { + fn.setup() + } + + override def process(value: T): TraversableOnce[T] = { + if (state.isEmpty) { + state = Option(value) + } else { + state = state.map(fn(_, value)) + } + None + } + + override def finish(): TraversableOnce[T] = { + state + } + + override def teardown(): Unit = { + state = None + fn.teardown() + } +} + +class Emit[T](emit: T => Unit) extends FunctionRunner[T, Unit] { + + override def process(value: T): TraversableOnce[Unit] = { + emit(value) + None + } + + override def description: String = "" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala deleted file mode 100644 index 687fd2e..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.plan.functions - -import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction -import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction - -/** - * Internal function to process single input - * - * @param IN input value type - * @param OUT output value type - */ -sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable { - - def setup(): Unit = {} - - def process(value: IN): TraversableOnce[OUT] - - def finish(): TraversableOnce[OUT] = None - - def teardown(): Unit = {} - - def description: String -} - -case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE], - second: SingleInputFunction[MIDDLE, OUT]) - extends SingleInputFunction[IN, OUT] { - - override def setup(): Unit = { - first.setup() - second.setup() - } - - override def process(value: IN): TraversableOnce[OUT] = { - first.process(value).flatMap(second.process) - } - - override def finish(): TraversableOnce[OUT] = { - val firstResult = first.finish().flatMap(second.process) - if (firstResult.isEmpty) { - second.finish() - } else { - firstResult - } - } - - override def teardown(): Unit = { - first.teardown() - second.teardown() - } - - override def description: String = { - Option(first.description).flatMap { description => - Option(second.description).map(description + "." + _) - }.orNull - } -} - -class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String) - extends SingleInputFunction[IN, OUT] { - - override def setup(): Unit = { - fn.setup() - } - - override def process(value: IN): TraversableOnce[OUT] = { - fn(value) - } - - override def teardown(): Unit = { - fn.teardown() - } -} - -class Reducer[T](fn: ReduceFunction[T], val description: String) - extends SingleInputFunction[T, T] { - - private var state: Option[T] = None - - override def setup(): Unit = { - fn.setup() - } - - override def process(value: T): TraversableOnce[T] = { - if (state.isEmpty) { - state = Option(value) - } else { - state = state.map(fn(_, value)) - } - None - } - - override def finish(): TraversableOnce[T] = { - state - } - - override def teardown(): Unit = { - state = None - fn.teardown() - } -} - -class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { - - override def process(value: T): TraversableOnce[Unit] = { - emit(value) - None - } - - override def description: String = "" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index 430d795..bdb245c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.plan._ import org.apache.gearpump.streaming.dsl.plan.functions._ import org.apache.gearpump.streaming.dsl.window.api._ -import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow} +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowAndGroup} import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -122,7 +122,7 @@ class Stream[T]( transform(new Reducer[T](fn, description)) } - private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = { + private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = { val op = ChainableOp(fn) graph.addVertex(op) graph.addEdge(thisNode, edge.getOrElse(Direct), op) @@ -173,7 +173,7 @@ class Stream[T]( */ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy"): Stream[T] = { - window(CountWindow.apply(1).accumulating) + window(CountWindows.apply(1).accumulating) .groupBy[GROUP](fn, parallelism, description) } @@ -184,7 +184,7 @@ class Stream[T]( * @param description window description * @return [[WindowStream]] where groupBy could be applied */ - def window(win: Window, description: String = "window"): WindowStream[T] = { + def window(win: Windows[T], description: String = "window"): WindowStream[T] = { new WindowStream[T](graph, edge, thisNode, win, description) } @@ -206,12 +206,12 @@ class Stream[T]( } class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, - window: Window, winDesc: String) { + window: Windows[T], winDesc: String) { def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy"): Stream[T] = { - val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window) - val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism, + val groupBy: GroupByFn[T, List[WindowAndGroup[GROUP]]] = GroupAlsoByWindow(fn, window) + val groupOp = GroupByOp[T, List[WindowAndGroup[GROUP]]](groupBy, parallelism, s"$winDesc.$description") graph.addVertex(groupOp) graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala index 06f2964..0dc28eb 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala @@ -22,7 +22,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn +import org.apache.gearpump.streaming.dsl.window.api.CountWindowFunction import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} import org.apache.gearpump.streaming.task.{Task, TaskContext} @@ -48,7 +48,7 @@ class CountTriggerTask[IN, GROUP]( taskContext, userConfig) } - private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFn].size + private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFunction[IN]].size private var num = 0 override def onNext(msg: Message): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala index 78ba762..a04e3ca 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering -import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn +import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFunction import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} import org.apache.gearpump.streaming.task.{Task, TaskContext} @@ -57,7 +57,7 @@ class ProcessingTimeTriggerTask[IN, GROUP]( taskContext, userConfig) } - private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn] + private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFunction[IN]] private val windowSizeMs = windowFn.size.toMillis private val windowStepMs = windowFn.step.toMillis http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index f8fbefa..5febeb6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -22,14 +22,14 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner import org.apache.gearpump.streaming.task.{Task, TaskContext} -class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]], +class TransformTask[IN, OUT](operator: Option[FunctionRunner[IN, OUT]], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[SingleInputFunction[IN, OUT]]( + this(userConf.getValue[FunctionRunner[IN, OUT]]( GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala deleted file mode 100644 index 4b94879..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.window.api - -import java.time.Duration - -/** - * - * @param windowFn - * @param trigger - * @param accumulationMode - */ -case class Window( - windowFn: WindowFn, - trigger: Trigger = EventTimeTrigger, - accumulationMode: AccumulationMode = Discarding) { - - def triggering(trigger: Trigger): Window = { - Window(windowFn, trigger) - } - - def accumulating: Window = { - Window(windowFn, trigger, Accumulating) - } - - def discarding: Window = { - Window(windowFn, trigger, Discarding) - } -} - -object CountWindow { - - def apply(size: Int): Window = { - Window(CountWindowFn(size), CountTrigger) - } -} - -object FixedWindow { - - /** - * Defines a FixedWindow. - * @param size window size - * @return a Window definition - */ - def apply(size: Duration): Window = { - Window(SlidingWindowFn(size, size)) - } -} - -object SlidingWindow { - - /** - * Defines a SlidingWindow - * @param size window size - * @param step window step to slide forward - * @return a Window definition - */ - def apply(size: Duration, step: Duration): Window = { - Window(SlidingWindowFn(size, step)) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala deleted file mode 100644 index 0768730..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.window.api - -import java.time.{Duration, Instant} - -import org.apache.gearpump.TimeStamp -import org.apache.gearpump.streaming.dsl.window.impl.Bucket - -import scala.collection.mutable.ArrayBuffer - -sealed trait WindowFn { - def apply(timestamp: Instant): List[Bucket] -} - -case class SlidingWindowFn(size: Duration, step: Duration) - extends WindowFn { - - def this(size: Duration) = { - this(size, size) - } - - override def apply(timestamp: Instant): List[Bucket] = { - val sizeMillis = size.toMillis - val stepMillis = step.toMillis - val timeMillis = timestamp.toEpochMilli - val windows = ArrayBuffer.empty[Bucket] - var start = lastStartFor(timeMillis, stepMillis) - windows += Bucket.ofEpochMilli(start, start + sizeMillis) - start -= stepMillis - while (start >= timeMillis) { - windows += Bucket.ofEpochMilli(start, start + sizeMillis) - start -= stepMillis - } - windows.toList - } - - private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = { - timestamp - (timestamp + windowStep) % windowStep - } -} - -case class CountWindowFn(size: Int) extends WindowFn { - - override def apply(timestamp: Instant): List[Bucket] = { - List(Bucket.ofEpochMilli(0, size)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala new file mode 100644 index 0000000..9ef171d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import java.time.{Duration, Instant} + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.dsl.window.impl.Window + +import scala.collection.mutable.ArrayBuffer + +object WindowFunction { + + trait Context[T] { + def element: T + def timestamp: Instant + } +} + +trait WindowFunction[T] { + def apply(context: WindowFunction.Context[T]): Array[Window] +} + +case class SlidingWindowFunction[T](size: Duration, step: Duration) + extends WindowFunction[T] { + + def this(size: Duration) = { + this(size, size) + } + + override def apply(context: WindowFunction.Context[T]): Array[Window] = { + val timestamp = context.timestamp + val sizeMillis = size.toMillis + val stepMillis = step.toMillis + val timeMillis = timestamp.toEpochMilli + val windows = ArrayBuffer.empty[Window] + var start = lastStartFor(timeMillis, stepMillis) + windows += Window.ofEpochMilli(start, start + sizeMillis) + start -= stepMillis + while (start >= timeMillis) { + windows += Window.ofEpochMilli(start, start + sizeMillis) + start -= stepMillis + } + windows.toArray + } + + private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = { + timestamp - (timestamp + windowStep) % windowStep + } +} + +case class CountWindowFunction[T](size: Int) extends WindowFunction[T] { + + override def apply(context: WindowFunction.Context[T]): Array[Window] = { + Array(Window.ofEpochMilli(0, size)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala new file mode 100644 index 0000000..c636a55 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import java.time.Duration + +/** + * + * @param windowFn + * @param trigger + * @param accumulationMode + */ +case class Windows[T]( + windowFn: WindowFunction[T], + trigger: Trigger = EventTimeTrigger, + accumulationMode: AccumulationMode = Discarding) { + + def triggering(trigger: Trigger): Windows[T] = { + Windows(windowFn, trigger) + } + + def accumulating: Windows[T] = { + Windows(windowFn, trigger, Accumulating) + } + + def discarding: Windows[T] = { + Windows(windowFn, trigger, Discarding) + } +} + +object CountWindows { + + def apply[T](size: Int): Windows[T] = { + Windows(CountWindowFunction(size), CountTrigger) + } +} + +object FixedWindows { + + /** + * Defines a FixedWindow. + * @param size window size + * @return a Window definition + */ + def apply[T](size: Duration): Windows[T] = { + Windows(SlidingWindowFunction(size, size)) + } +} + +object SlidingWindow { + + /** + * Defines a SlidingWindow + * @param size window size + * @param step window step to slide forward + * @return a Window definition + */ + def apply[T](size: Duration, step: Duration): Windows[T] = { + Windows(SlidingWindowFunction(size, step)) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index 53cf5d0..eb5d551 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -28,17 +28,18 @@ import org.apache.gearpump.streaming.dsl.window.api._ import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask} import org.apache.gearpump.streaming.task.Task -object Bucket { - def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = { - Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)) +object Window { + def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Window = { + Window(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)) } } /** * A window unit including startTime and excluding endTime. */ -case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucket] { - override def compareTo(o: Bucket): Int = { +case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] { + + override def compareTo(o: Window): Int = { val ret = startTime.compareTo(o.startTime) if (ret != 0) { ret @@ -48,13 +49,32 @@ case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucke } } -case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window) - extends GroupByFn[T, (GROUP, List[Bucket])] { +case class WindowAndGroup[GROUP](window: Window, group: GROUP) + extends Comparable[WindowAndGroup[GROUP]] { + + override def compareTo(o: WindowAndGroup[GROUP]): Int = { + val ret = window.compareTo(o.window) + if (ret != 0) { + ret + } else if (group.equals(o.group)) { + 0 + } else { + -1 + } + } +} + +case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) + extends GroupByFn[T, List[WindowAndGroup[GROUP]]] { - override def groupBy(message: Message): (GROUP, List[Bucket]) = { - val group = groupByFn(message.msg.asInstanceOf[T]) - val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp)) - group -> buckets + override def groupBy(message: Message): List[WindowAndGroup[GROUP]] = { + val ele = message.msg.asInstanceOf[T] + val group = groupByFn(ele) + val windows = window.windowFn(new WindowFunction.Context[T] { + override def element: T = ele + override def timestamp: Instant = Instant.ofEpochMilli(message.timestamp) + }) + windows.map(WindowAndGroup(_, group)).toList } override def getProcessor(parallelism: Int, description: String, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 223a4af..7a16100 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -26,9 +26,8 @@ import com.gs.collections.api.block.procedure.Procedure import com.gs.collections.impl.list.mutable.FastList import com.gs.collections.impl.map.mutable.UnifiedMap import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap -import com.gs.collections.impl.set.mutable.UnifiedSet import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, FunctionRunner} import org.apache.gearpump.streaming.dsl.window.api.Discarding import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.LogUtil @@ -45,36 +44,32 @@ object DefaultWindowRunner { private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) - case class WindowGroup[GROUP](bucket: Bucket, group: GROUP) + case class InputsAndFn[IN, OUT](inputs: FastList[IN], fn: FunctionRunner[IN, OUT]) } class DefaultWindowRunner[IN, GROUP, OUT]( taskContext: TaskContext, userConfig: UserConfig, groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem) extends WindowRunner { - import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._ - private val windows = new TreeSortedMap[Bucket, UnifiedSet[WindowGroup[GROUP]]] - private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]] - private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]] + private val groupedInputs = new TreeSortedMap[WindowAndGroup[GROUP], FastList[IN]] + private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]] override def process(message: Message): Unit = { - val (group, buckets) = groupBy.groupBy(message) - buckets.foreach { bucket => - val wg = WindowGroup(bucket, group) - val wgs = windows.getOrDefault(bucket, new UnifiedSet[WindowGroup[GROUP]](1)) - wgs.add(wg) - windows.put(bucket, wgs) - - val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1)) - inputs.add(message.msg.asInstanceOf[IN]) - windowGroups.put(wg, inputs) - } - if (!groupFns.containsKey(group)) { - val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - fn.setup() - groupFns.put(group, fn) + val wgs = groupBy.groupBy(message) + wgs.foreach { wg => + if (!groupedInputs.containsKey(wg)) { + val inputs = new FastList[IN](1) + groupedInputs.put(wg, inputs) + } + groupedInputs.get(wg).add(message.msg.asInstanceOf[IN]) + if (!groupedFnRunners.containsKey(wg.group)) { + val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get + fn.setup() + groupedFnRunners.put(wg.group, fn) + } } + } override def trigger(time: Instant): Unit = { @@ -82,29 +77,28 @@ class DefaultWindowRunner[IN, GROUP, OUT]( @annotation.tailrec def onTrigger(): Unit = { - if (windows.notEmpty()) { - val first = windows.firstKey - if (!time.isBefore(first.endTime)) { - val wgs = windows.remove(first) - wgs.forEach(new Procedure[WindowGroup[GROUP]] { - override def value(each: WindowGroup[GROUP]): Unit = { - val inputs = windowGroups.remove(each) - val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time))) - inputs.forEach(new Procedure[IN] { - override def value(t: IN): Unit = { - // .toList forces eager evaluation - reduceFn.process(t).toList - } - }) - // .toList forces eager evaluation - reduceFn.finish().toList - if (groupBy.window.accumulationMode == Discarding) { - reduceFn.teardown() + if (groupedInputs.notEmpty()) { + val first = groupedInputs.firstKey + if (!time.isBefore(first.window.endTime)) { + val inputs = groupedInputs.remove(first) + if (groupedFnRunners.containsKey(first.group)) { + val reduceFn = AndThen(groupedFnRunners.get(first.group), + new Emit[OUT](output => emitResult(output, time))) + inputs.forEach(new Procedure[IN] { + override def value(t: IN): Unit = { + // .toList forces eager evaluation + reduceFn.process(t).toList } + }) + // .toList forces eager evaluation + reduceFn.finish().toList + if (groupBy.window.accumulationMode == Discarding) { + reduceFn.teardown() } - }) - - onTrigger() + onTrigger() + } else { + throw new RuntimeException(s"FunctionRunner not found for group ${first.group}") + } } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 450f2d6..0d0dfa2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner import org.apache.gearpump.streaming.task.{Task, TaskContext} /** @@ -42,13 +42,13 @@ class DataSourceTask[IN, OUT] private[source]( context: TaskContext, conf: UserConfig, source: DataSource, - operator: Option[SingleInputFunction[IN, OUT]]) + operator: Option[FunctionRunner[IN, OUT]]) extends Task(context, conf) { def this(context: TaskContext, conf: UserConfig) = { this(context, conf, conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get, - conf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system) + conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system) ) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala index f49eb04..fb45e35 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala @@ -23,8 +23,8 @@ import java.time.Duration import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People -import org.apache.gearpump.streaming.dsl.window.api.{FixedWindow, GroupByFn} -import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow} +import org.apache.gearpump.streaming.dsl.window.api.{FixedWindows, GroupByFn} +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowAndGroup} class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { @@ -34,9 +34,10 @@ class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterA val michelle = People("Michelle", "female") val partitionNum = 10 - val groupByFn: GroupByFn[People, (String, List[Bucket])] = - GroupAlsoByWindow[People, String](_.gender, FixedWindow.apply(Duration.ofMillis(5))) - val groupBy = new GroupByPartitioner[People, (String, List[Bucket])](groupByFn) + val groupByFn: GroupByFn[People, List[WindowAndGroup[String]]] = + GroupAlsoByWindow[People, String](_.gender, + FixedWindows.apply[People](Duration.ofMillis(5))) + val groupBy = new GroupByPartitioner[People, List[WindowAndGroup[String]]](groupByFn) groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe groupBy.getPartition(Message(tom, 2L), partitionNum) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index f0920de..461d3da 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FunctionRunner} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GroupByFn import org.apache.gearpump.streaming.sink.DataSink @@ -65,7 +65,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val dataSource = new AnySource val dataSourceOp = DataSourceOp(dataSource) val chainableOp = mock[ChainableOp[Any, Any]] - val fn = mock[SingleInputFunction[Any, Any]] + val fn = mock[FunctionRunner[Any, Any]] val chainedOp = dataSourceOp.chain(chainableOp) @@ -138,10 +138,10 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "ChainableOp" should { "chain ChainableOp" in { - val fn1 = mock[SingleInputFunction[Any, Any]] + val fn1 = mock[FunctionRunner[Any, Any]] val chainableOp1 = ChainableOp[Any, Any](fn1) - val fn2 = mock[SingleInputFunction[Any, Any]] + val fn2 = mock[FunctionRunner[Any, Any]] val chainableOp2 = ChainableOp[Any, Any](fn2) val chainedOp = chainableOp1.chain(chainableOp2) @@ -171,7 +171,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "chain ChainableOp" in { val groupByFn = mock[GroupByFn[Any, Any]] val groupByOp = GroupByOp[Any, Any](groupByFn) - val fn = mock[SingleInputFunction[Any, Any]] + val fn = mock[FunctionRunner[Any, Any]] val chainableOp = mock[ChainableOp[Any, Any]] when(chainableOp.fn).thenReturn(fn) @@ -199,7 +199,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val mergeOp = MergeOp("merge") "chain ChainableOp" in { - val fn = mock[SingleInputFunction[Any, Any]] + val fn = mock[FunctionRunner[Any, Any]] val chainableOp = mock[ChainableOp[Any, Any]] when(chainableOp.fn).thenReturn(fn) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala new file mode 100644 index 0000000..08a259a --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan.functions + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} +import org.apache.gearpump.streaming.dsl.window.api.CountWindows +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow +import org.mockito.ArgumentCaptor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{Matchers, WordSpec} +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { + import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunnerSpec._ + + "AndThen" should { + + val first = mock[FunctionRunner[R, S]] + val second = mock[FunctionRunner[S, T]] + val andThen = AndThen(first, second) + + "chain first and second functions when processing input value" in { + val input = mock[R] + val firstOutput = mock[S] + val secondOutput = mock[T] + when(first.process(input)).thenReturn(Some(firstOutput)) + when(second.process(firstOutput)).thenReturn(Some(secondOutput)) + + andThen.process(input).toList shouldBe List(secondOutput) + } + + "return chained description" in { + when(first.description).thenReturn("first") + when(second.description).thenReturn("second") + andThen.description shouldBe "first.second" + } + + "return either first result or second on finish" in { + val firstResult = mock[S] + val processedFirst = mock[T] + val secondResult = mock[T] + + when(first.finish()).thenReturn(Some(firstResult)) + when(second.process(firstResult)).thenReturn(Some(processedFirst)) + andThen.finish().toList shouldBe List(processedFirst) + + when(first.finish()).thenReturn(None) + when(second.finish()).thenReturn(Some(secondResult)) + andThen.finish().toList shouldBe List(secondResult) + } + + "set up both functions on setup" in { + andThen.setup() + + verify(first).setup() + verify(second).setup() + } + + "tear down both functions on teardown" in { + andThen.teardown() + + verify(first).teardown() + verify(second).teardown() + } + + "chain multiple single input function" in { + val split = new FlatMapper[String, String](FlatMapFunction(_.split("\\s")), "split") + + val filter = new FlatMapper[String, String]( + FlatMapFunction(word => if (word.isEmpty) None else Some(word)), "filter") + + val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map") + + val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum") + + val all = AndThen(split, AndThen(filter, AndThen(map, sum))) + + assert(all.description == "split.filter.map.sum") + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + // force eager evaluation + all.process(data).toList + val result = all.finish().toList + assert(result.nonEmpty) + assert(result.last == 15) + } + } + + "FlatMapper" should { + + val flatMapFunction = mock[FlatMapFunction[R, S]] + val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap") + + "call flatMap function when processing input value" in { + val input = mock[R] + flatMapper.process(input) + verify(flatMapFunction).apply(input) + } + + "return passed in description" in { + flatMapper.description shouldBe "flatMap" + } + + "return None on finish" in { + flatMapper.finish() shouldBe List.empty[S] + } + + "set up FlatMapFunction on setup" in { + flatMapper.setup() + + verify(flatMapFunction).setup() + } + + "tear down FlatMapFunction on teardown" in { + flatMapper.teardown() + + verify(flatMapFunction).teardown() + } + } + + "ReduceFunction" should { + + "call reduce function when processing input value" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + val input1 = mock[T] + val input2 = mock[T] + val output = mock[T] + + when(reduceFunction.apply(input1, input2)).thenReturn(output, output) + + reducer.process(input1) shouldBe List.empty[T] + reducer.process(input2) shouldBe List.empty[T] + reducer.finish() shouldBe List(output) + + reducer.teardown() + reducer.process(input1) shouldBe List.empty[T] + reducer.teardown() + reducer.process(input2) shouldBe List.empty[T] + reducer.finish() shouldBe List(input2) + } + + "return passed in description" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.description shouldBe "reduce" + } + + "return None on finish" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.finish() shouldBe List.empty[T] + } + + "set up reduce function on setup" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.setup() + + verify(reduceFunction).setup() + } + + "tear down reduce function on teardown" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.teardown() + + verify(reduceFunction).teardown() + } + } + + "Emit" should { + + val emitFunction = mock[T => Unit] + val emit = new Emit[T](emitFunction) + + "emit input value when processing input value" in { + val input = mock[T] + + emit.process(input) shouldBe List.empty[Unit] + + verify(emitFunction).apply(input) + } + + "return empty description" in { + emit.description shouldBe "" + } + + "return None on finish" in { + emit.finish() shouldBe List.empty[Unit] + } + + "do nothing on setup" in { + emit.setup() + + verifyZeroInteractions(emitFunction) + } + + "do nothing on teardown" in { + emit.teardown() + + verifyZeroInteractions(emitFunction) + } + } + + "Source" should { + "iterate over input source and apply attached operator" in { + + val taskContext = MockUtil.mockTaskContext + implicit val actorSystem = MockUtil.system + + val data = "one two three".split("\\s+") + val dataSource = new CollectionDataSource[String](data) + val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) + + // Source with no transformer + val source = new DataSourceTask[String, String]( + taskContext, conf) + source.onStart(Instant.EPOCH) + source.onNext(Message("next")) + data.foreach { s => + verify(taskContext, times(1)).output(MockUtil.argMatch[Message]( + message => message.msg == s)) + } + + // Source with transformer + val anotherTaskContext = MockUtil.mockTaskContext + val double = new FlatMapper[String, String](FlatMapFunction( + word => List(word, word)), "double") + val another = new DataSourceTask(anotherTaskContext, + conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) + another.onStart(Instant.EPOCH) + another.onNext(Message("next")) + data.foreach { s => + verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message]( + message => message.msg == s)) + } + } + } + + "CountTriggerTask" should { + "group input by groupBy Function and " + + "apply attached operator for each group" in { + + val data = "1 2 2 3 3 3" + + val concat = new Reducer[String](ReduceFunction({ (left, right) => + left + right}), "concat") + + implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + val config = UserConfig.empty.withValue[FunctionRunner[String, String]]( + GEARPUMP_STREAMING_OPERATOR, concat) + + val taskContext = MockUtil.mockTaskContext + + val groupBy = GroupAlsoByWindow((input: String) => input, + CountWindows.apply[String](1).accumulating) + val task = new CountTriggerTask[String, String](groupBy, taskContext, config) + task.onStart(Instant.EPOCH) + + val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) + + data.split("\\s+").foreach { word => + task.onNext(Message(word)) + } + verify(taskContext, times(6)).output(peopleCaptor.capture()) + + import scala.collection.JavaConverters._ + + val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) + assert(values.mkString(",") == "1,2,22,3,33,333") + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } + + "MergeTask" should { + "accept two stream and apply the attached operator" in { + + // Source with transformer + val taskContext = MockUtil.mockTaskContext + val conf = UserConfig.empty + val double = new FlatMapper[String, String](FlatMapFunction( + word => List(word, word)), "double") + val task = new TransformTask[String, String](Some(double), taskContext, conf) + task.onStart(Instant.EPOCH) + + val data = "1 2 2 3 3 3".split("\\s+") + + data.foreach { input => + task.onNext(Message(input)) + } + + verify(taskContext, times(data.length * 2)).output(anyObject()) + } + } +} + +object FunctionRunnerSpec { + type R = AnyRef + type S = AnyRef + type T = AnyRef +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala deleted file mode 100644 index 2c03e1c..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.plan.functions - -import java.time.Instant - -import akka.actor.ActorSystem -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.source.DataSourceTask -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction -import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource -import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} -import org.apache.gearpump.streaming.dsl.window.api.CountWindow -import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{Matchers, WordSpec} -import org.scalatest.mock.MockitoSugar - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { - import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._ - - "AndThen" should { - - val first = mock[SingleInputFunction[R, S]] - val second = mock[SingleInputFunction[S, T]] - val andThen = AndThen(first, second) - - "chain first and second functions when processing input value" in { - val input = mock[R] - val firstOutput = mock[S] - val secondOutput = mock[T] - when(first.process(input)).thenReturn(Some(firstOutput)) - when(second.process(firstOutput)).thenReturn(Some(secondOutput)) - - andThen.process(input).toList shouldBe List(secondOutput) - } - - "return chained description" in { - when(first.description).thenReturn("first") - when(second.description).thenReturn("second") - andThen.description shouldBe "first.second" - } - - "return either first result or second on finish" in { - val firstResult = mock[S] - val processedFirst = mock[T] - val secondResult = mock[T] - - when(first.finish()).thenReturn(Some(firstResult)) - when(second.process(firstResult)).thenReturn(Some(processedFirst)) - andThen.finish().toList shouldBe List(processedFirst) - - when(first.finish()).thenReturn(None) - when(second.finish()).thenReturn(Some(secondResult)) - andThen.finish().toList shouldBe List(secondResult) - } - - "set up both functions on setup" in { - andThen.setup() - - verify(first).setup() - verify(second).setup() - } - - "tear down both functions on teardown" in { - andThen.teardown() - - verify(first).teardown() - verify(second).teardown() - } - - "chain multiple single input function" in { - val split = new FlatMapper[String, String](FlatMapFunction(_.split("\\s")), "split") - - val filter = new FlatMapper[String, String]( - FlatMapFunction(word => if (word.isEmpty) None else Some(word)), "filter") - - val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map") - - val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum") - - val all = AndThen(split, AndThen(filter, AndThen(map, sum))) - - assert(all.description == "split.filter.map.sum") - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - // force eager evaluation - all.process(data).toList - val result = all.finish().toList - assert(result.nonEmpty) - assert(result.last == 15) - } - } - - "FlatMapper" should { - - val flatMapFunction = mock[FlatMapFunction[R, S]] - val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap") - - "call flatMap function when processing input value" in { - val input = mock[R] - flatMapper.process(input) - verify(flatMapFunction).apply(input) - } - - "return passed in description" in { - flatMapper.description shouldBe "flatMap" - } - - "return None on finish" in { - flatMapper.finish() shouldBe List.empty[S] - } - - "set up FlatMapFunction on setup" in { - flatMapper.setup() - - verify(flatMapFunction).setup() - } - - "tear down FlatMapFunction on teardown" in { - flatMapper.teardown() - - verify(flatMapFunction).teardown() - } - } - - "ReduceFunction" should { - - "call reduce function when processing input value" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - val input1 = mock[T] - val input2 = mock[T] - val output = mock[T] - - when(reduceFunction.apply(input1, input2)).thenReturn(output, output) - - reducer.process(input1) shouldBe List.empty[T] - reducer.process(input2) shouldBe List.empty[T] - reducer.finish() shouldBe List(output) - - reducer.teardown() - reducer.process(input1) shouldBe List.empty[T] - reducer.teardown() - reducer.process(input2) shouldBe List.empty[T] - reducer.finish() shouldBe List(input2) - } - - "return passed in description" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - reducer.description shouldBe "reduce" - } - - "return None on finish" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - reducer.finish() shouldBe List.empty[T] - } - - "set up reduce function on setup" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - reducer.setup() - - verify(reduceFunction).setup() - } - - "tear down reduce function on teardown" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - reducer.teardown() - - verify(reduceFunction).teardown() - } - } - - "Emit" should { - - val emitFunction = mock[T => Unit] - val emit = new Emit[T](emitFunction) - - "emit input value when processing input value" in { - val input = mock[T] - - emit.process(input) shouldBe List.empty[Unit] - - verify(emitFunction).apply(input) - } - - "return empty description" in { - emit.description shouldBe "" - } - - "return None on finish" in { - emit.finish() shouldBe List.empty[Unit] - } - - "do nothing on setup" in { - emit.setup() - - verifyZeroInteractions(emitFunction) - } - - "do nothing on teardown" in { - emit.teardown() - - verifyZeroInteractions(emitFunction) - } - } - - "Source" should { - "iterate over input source and apply attached operator" in { - - val taskContext = MockUtil.mockTaskContext - implicit val actorSystem = MockUtil.system - - val data = "one two three".split("\\s+") - val dataSource = new CollectionDataSource[String](data) - val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) - - // Source with no transformer - val source = new DataSourceTask[String, String]( - taskContext, conf) - source.onStart(Instant.EPOCH) - source.onNext(Message("next")) - data.foreach { s => - verify(taskContext, times(1)).output(MockUtil.argMatch[Message]( - message => message.msg == s)) - } - - // Source with transformer - val anotherTaskContext = MockUtil.mockTaskContext - val double = new FlatMapper[String, String](FlatMapFunction( - word => List(word, word)), "double") - val another = new DataSourceTask(anotherTaskContext, - conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) - another.onStart(Instant.EPOCH) - another.onNext(Message("next")) - data.foreach { s => - verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message]( - message => message.msg == s)) - } - } - } - - "CountTriggerTask" should { - "group input by groupBy Function and " + - "apply attached operator for each group" in { - - val data = "1 2 2 3 3 3" - - val concat = new Reducer[String](ReduceFunction({ (left, right) => - left + right}), "concat") - - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( - GEARPUMP_STREAMING_OPERATOR, concat) - - val taskContext = MockUtil.mockTaskContext - - val groupBy = GroupAlsoByWindow((input: String) => input, CountWindow.apply(1).accumulating) - val task = new CountTriggerTask[String, String](groupBy, taskContext, config) - task.onStart(Instant.EPOCH) - - val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) - - data.split("\\s+").foreach { word => - task.onNext(Message(word)) - } - verify(taskContext, times(6)).output(peopleCaptor.capture()) - - import scala.collection.JavaConverters._ - - val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) - assert(values.mkString(",") == "1,2,22,3,33,333") - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - } - - "MergeTask" should { - "accept two stream and apply the attached operator" in { - - // Source with transformer - val taskContext = MockUtil.mockTaskContext - val conf = UserConfig.empty - val double = new FlatMapper[String, String](FlatMapFunction( - word => List(word, word)), "double") - val task = new TransformTask[String, String](Some(double), taskContext, conf) - task.onStart(Instant.EPOCH) - - val data = "1 2 2 3 3 3".split("\\s+") - - data.foreach { input => - task.onNext(Message(input)) - } - - verify(taskContext, times(data.length * 2)).output(anyObject()) - } - } -} - -object SingleInputFunctionSpec { - type R = AnyRef - type S = AnyRef - type T = AnyRef -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala index 871d751..1a4958a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala @@ -22,7 +22,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.api.CountWindows import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} import org.mockito.Mockito._ import org.scalacheck.Gen @@ -42,7 +42,7 @@ class CountTriggerTaskSpec extends PropSpec with PropertyChecks forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) => val groupBy = mock[GroupAlsoByWindow[Any, Any]] - val window = CountWindow.apply(windowSize) + val window = CountWindows.apply[Any](windowSize) when(groupBy.window).thenReturn(window) val windowRunner = mock[WindowRunner] val userConfig = UserConfig.empty http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala index a69abe6..07b5544 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala @@ -43,7 +43,7 @@ class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks forAll(windowSizeGen, windowStepGen, watermarkGen) { (windowSize: Long, windowStep: Long, watermark: Instant) => - val window = SlidingWindow.apply(Duration.ofMillis(windowSize), + val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize), Duration.ofMillis(windowStep)).triggering(EventTimeTrigger) val groupBy = mock[GroupAlsoByWindow[Any, Any]] val windowRunner = mock[WindowRunner] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala index 39e1b4c..ef51ab2 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala @@ -44,7 +44,7 @@ class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks forAll(windowSizeGen, windowStepGen, startTimeGen) { (windowSize: Long, windowStep: Long, startTime: Instant) => - val window = SlidingWindow.apply(Duration.ofMillis(windowSize), + val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize), Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger) val groupBy = mock[GroupAlsoByWindow[Any, Any]] val windowRunner = mock[WindowRunner] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index b6e7342..8266df5 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -22,7 +22,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner import org.mockito.Mockito.{verify, when} import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} @@ -36,7 +36,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val config = UserConfig.empty - val operator = mock[SingleInputFunction[Any, Any]] + val operator = mock[FunctionRunner[Any, Any]] val sourceTask = new TransformTask[Any, Any](Some(operator), taskContext, config) sourceTask.onStart(startTime) @@ -50,7 +50,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val config = UserConfig.empty - val operator = mock[SingleInputFunction[Any, Any]] + val operator = mock[FunctionRunner[Any, Any]] val task = new TransformTask[Any, Any](Some(operator), taskContext, config) val msg = Message(str) when(operator.process(str)).thenReturn(Some(str)) @@ -65,7 +65,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val config = UserConfig.empty - val operator = mock[SingleInputFunction[Any, Any]] + val operator = mock[FunctionRunner[Any, Any]] val task = new TransformTask[Any, Any](Some(operator), taskContext, config) task.onStop() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5d524918/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index 4e95bdd..f7f6fd9 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -39,7 +39,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val operator = mock[SingleInputFunction[Any, Any]] + val operator = mock[FunctionRunner[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator)) sourceTask.onStart(startTime) @@ -72,7 +72,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val operator = mock[SingleInputFunction[Any, Any]] + val operator = mock[FunctionRunner[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator)) sourceTask.onStop()
