[GEARPUMP-23] add window dsl The PR is opened for early review and the work is in progress with following todos.
- [x] basic window dsl support with `WindowedWordCount` example - [x] improve `ReduceFunction` to not emit intermediate results - [x] add unit tests - [ ] add comments and update documentation - [ ] support different types of computation (e.g. monoid which doesn't require input elements to be held in the window) Author: manuzhang <[email protected]> Closes #85 from manuzhang/window_dsl. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/66017ab7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/66017ab7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/66017ab7 Branch: refs/heads/master Commit: 66017ab7bf5166ec312684f0e3e49e7219b4c24d Parents: 5c4d60c Author: manuzhang <[email protected]> Authored: Sun Oct 9 08:52:00 2016 +0800 Committer: manuzhang <[email protected]> Committed: Sun Oct 9 08:52:00 2016 +0800 ---------------------------------------------------------------------- .../wordcount/dsl/WindowedWordCount.scala | 87 +++++ .../apache/gearpump/streaming/Constants.scala | 1 + .../gearpump/streaming/StreamApplication.scala | 2 +- .../apache/gearpump/streaming/dsl/Stream.scala | 106 +++--- .../gearpump/streaming/dsl/StreamApp.scala | 34 +- .../streaming/dsl/javaapi/JavaStream.scala | 22 +- .../apache/gearpump/streaming/dsl/op/OP.scala | 109 ------ .../dsl/partitioner/GroupByPartitioner.scala | 49 +++ .../dsl/partitioner/GroupbyPartitioner.scala | 46 --- .../apache/gearpump/streaming/dsl/plan/OP.scala | 214 ++++++++++++ .../streaming/dsl/plan/OpTranslator.scala | 222 ------------- .../gearpump/streaming/dsl/plan/Planner.scala | 65 ++-- .../plan/functions/SingleInputFunction.scala | 107 ++++++ .../streaming/dsl/task/CountTriggerTask.scala | 63 ++++ .../dsl/task/EventTimeTriggerTask.scala | 59 ++++ .../dsl/task/ProcessingTimeTriggerTask.scala | 82 +++++ .../streaming/dsl/task/TransformTask.scala | 47 +++ .../dsl/window/api/AccumulationMode.scala | 24 ++ .../streaming/dsl/window/api/GroupByFn.scala | 47 +++ .../streaming/dsl/window/api/Trigger.scala | 27 ++ .../streaming/dsl/window/api/Window.scala | 77 +++++ .../streaming/dsl/window/api/WindowFn.scala | 63 ++++ .../dsl/window/impl/ReduceFnRunner.scala | 29 ++ .../streaming/dsl/window/impl/Window.scala | 75 +++++ .../dsl/window/impl/WindowRunner.scala | 114 +++++++ .../streaming/source/DataSourceTask.scala | 15 +- .../gearpump/streaming/task/TaskActor.scala | 4 +- .../gearpump/streaming/dsl/StreamAppSpec.scala | 67 ++-- .../gearpump/streaming/dsl/StreamSpec.scala | 24 +- .../partitioner/GroupByPartitionerSpec.scala | 23 +- .../gearpump/streaming/dsl/plan/OpSpec.scala | 244 ++++++++++++++ .../streaming/dsl/plan/OpTranslatorSpec.scala | 148 --------- .../streaming/dsl/plan/PlannerSpec.scala | 132 ++++++++ .../functions/SingleInputFunctionSpec.scala | 333 +++++++++++++++++++ .../dsl/task/CountTriggerTaskSpec.scala | 61 ++++ .../dsl/task/EventTimeTriggerTaskSpec.scala | 66 ++++ .../task/ProcessingTimeTriggerTaskSpec.scala | 69 ++++ 37 files changed, 2246 insertions(+), 711 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala new file mode 100644 index 0000000..4f43fd4 --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.wordcount.dsl + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp} +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow} +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.AkkaApp + +object WindowedWordCount extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + val app = StreamApp("dsl", context) + app.source[String](new TimedDataSource). + // word => (word, count) + flatMap(line => line.split("[\\s]+")).map((_, 1)). + // fix window + window(FixedWindow.apply(Duration.ofMillis(5L)) + .triggering(EventTimeTrigger)). + // (word, count1), (word, count2) => (word, count1 + count2) + groupBy(_._1). + sum.sink(new LoggerSink) + + context.submit(app) + context.close() + } + + private class TimedDataSource extends DataSource { + + private var data = List( + Message("foo", 1L), + Message("bar", 2L), + Message("foo", 3L), + Message("foo", 5L), + Message("bar", 7L), + Message("bar", 8L) + ) + + private var watermark: Instant = Instant.ofEpochMilli(0) + + override def read(): Message = { + if (data.nonEmpty) { + val msg = data.head + data = data.tail + watermark = Instant.ofEpochMilli(msg.timestamp) + msg + } else { + null + } + } + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def close(): Unit = {} + + override def getWatermark: Instant = { + if (data.isEmpty) { + watermark = watermark.plusMillis(1) + } + watermark + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index cd33b50..f99a436 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -22,6 +22,7 @@ object Constants { val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source" val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function" + val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function" val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index 66ec873..a6588a1 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -123,7 +123,7 @@ object LifeTime { */ class StreamApplication( override val name: String, val inputUserConfig: UserConfig, - val dag: Graph[ProcessorDescription, PartitionerDescription]) + dag: Graph[ProcessorDescription, PartitionerDescription]) extends Application { require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala index 786d496..440a45e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala @@ -20,7 +20,10 @@ package org.apache.gearpump.streaming.dsl import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.op._ +import org.apache.gearpump.streaming.dsl.plan._ +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.dsl.window.impl._ import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -35,12 +38,12 @@ class Stream[T]( /** * converts a value[T] to a list of value[R] * - * @param fun FlatMap function + * @param fn FlatMap function * @param description The description message for this operation * @return A new stream with type [R] */ - def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = { - val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap")) + def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { + val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description)) graph.addVertex(flatMapOp) graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) new Stream[R](graph, flatMapOp) @@ -49,36 +52,36 @@ class Stream[T]( /** * Maps message of type T message of type R * - * @param fun Function + * @param fn Function * @return A new stream with type [R] */ - def map[R](fun: T => R, description: String = null): Stream[R] = { + def map[R](fn: T => R, description: String = "map"): Stream[R] = { this.flatMap({ data => - Option(fun(data)) - }, Option(description).getOrElse("map")) + Option(fn(data)) + }, description) } /** * Keeps records when fun(T) == true * - * @param fun the filter + * @param fn the filter * @return a new stream after filter */ - def filter(fun: T => Boolean, description: String = null): Stream[T] = { + def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { this.flatMap({ data => - if (fun(data)) Option(data) else None - }, Option(description).getOrElse("filter")) + if (fn(data)) Option(data) else None + }, description) } /** * Reduces operations. * - * @param fun reduction function + * @param fn reduction function * @param description description message for this operator * @return a new stream after reduction */ - def reduce(fun: (T, T) => T, description: String = null): Stream[T] = { - val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce")) + def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { + val reduceOp = ChainableOp(new ReduceFunction(fn, description)) graph.addVertex(reduceOp) graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp) new Stream(graph, reduceOp) @@ -88,7 +91,10 @@ class Stream[T]( * Log to task log file */ def log(): Unit = { - this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log") + this.map(msg => { + LoggerFactory.getLogger("dsl").info(msg.toString) + msg + }, "log") } /** @@ -97,8 +103,8 @@ class Stream[T]( * @param other the other stream * @return the merged stream */ - def merge(other: Stream[T], description: String = null): Stream[T] = { - val mergeOp = MergeOp(Option(description).getOrElse("merge")) + def merge(other: Stream[T], description: String = "merge"): Stream[T] = { + val mergeOp = MergeOp(description, UserConfig.empty) graph.addVertex(mergeOp) graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) @@ -115,20 +121,29 @@ class Stream[T]( * * For example, * {{{ - * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..) + * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..) * }}} * - * @param fun Group by function + * @param fn Group by function * @param parallelism Parallelism level * @param description The description * @return the grouped stream */ - def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null) - : Stream[T] = { - val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy")) - graph.addVertex(groupOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) - new Stream[T](graph, groupOp) + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + window(CountWindow.apply(1).accumulating) + .groupBy[GROUP](fn, parallelism, description) + } + + /** + * Window function + * + * @param win window definition + * @param description window description + * @return [[WindowStream]] where groupBy could be applied + */ + def window(win: Window, description: String = "window"): WindowStream[T] = { + new WindowStream[T](graph, edge, thisNode, win, description) } /** @@ -140,15 +155,28 @@ class Stream[T]( */ def process[R]( processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, - description: String = null): Stream[R] = { - val processorOp = ProcessorOp(processor, parallelism, conf, - Option(description).getOrElse("process")) + description: String = "process"): Stream[R] = { + val processorOp = ProcessorOp(processor, parallelism, conf, description) graph.addVertex(processorOp) graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) new Stream[R](graph, processorOp, Some(Shuffle)) } } +class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, + window: Window, winDesc: String) { + + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window) + val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism, + s"$winDesc.$description") + graph.addVertex(groupOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) + new Stream[T](graph, groupOp) + } +} + class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { /** * GroupBy key @@ -192,30 +220,18 @@ object Stream { } implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { - def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String) - : Stream[T] = { - implicit val sink = DataSinkOp[T](dataSink, parallism, conf, - Some(description).getOrElse("traversable")) + def sink(dataSink: DataSink, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { + implicit val sink = DataSinkOp(dataSink, parallelism, conf, description) stream.graph.addVertex(sink) stream.graph.addEdge(stream.thisNode, Shuffle, sink) new Stream[T](stream.graph, sink) } - - def sink[T]( - sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty, - description: String = null): Stream[T] = { - val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source")) - stream.graph.addVertex(sinkOp) - stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp) - new Stream[T](stream.graph, sinkOp) - } } } class LoggerSink[T] extends DataSink { - var logger: Logger = null - - private var context: TaskContext = null + var logger: Logger = _ override def open(context: TaskContext): Unit = { this.logger = context.logger http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala index d45737b..8116146 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala @@ -24,10 +24,9 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.streaming.StreamApplication -import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp} -import org.apache.gearpump.streaming.dsl.plan.Planner +import org.apache.gearpump.streaming.dsl.plan._ import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.Graph import org.apache.gearpump.Message @@ -50,7 +49,8 @@ import scala.language.implicitConversions * @param name name of app */ class StreamApp( - val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) { + name: String, system: ActorSystem, userConfig: UserConfig, + private val graph: Graph[Op, OpEdge]) { def this(name: String, system: ActorSystem, userConfig: UserConfig) = { this(name, system, userConfig, Graph.empty[Op, OpEdge]) @@ -76,34 +76,16 @@ object StreamApp { implicit class Source(app: StreamApp) extends java.io.Serializable { - def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = { - source(dataSource, parallelism, UserConfig.empty) - } - - def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = { - source(dataSource, parallelism, UserConfig.empty, description) - } - - def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = { - source(dataSource, parallelism, conf, description = null) - } - - def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) - : Stream[T] = { + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) app.graph.addVertex(sourceOp) new Stream[T](app.graph, sourceOp) } + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) } - - def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String) - : Stream[T] = { - val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source")) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } } } @@ -115,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource { override def read(): Message = { if (iterator.hasNext) { - Message(iterator.next()) + Message(iterator.next(), Instant.now().toEpochMilli) } else { null } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index 6eff20c..3003b98 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.dsl.javaapi import scala.collection.JavaConverters._ - import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.Stream +import org.apache.gearpump.streaming.dsl.window.api.Window +import org.apache.gearpump.streaming.dsl.{Stream, WindowStream} import org.apache.gearpump.streaming.javaapi.dsl.functions._ import org.apache.gearpump.streaming.task.Task @@ -63,9 +63,13 @@ class JavaStream[T](val stream: Stream[T]) { * Group by a stream and turns it to a list of sub-streams. Operations chained after * groupBy applies to sub-streams. */ - def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String) - : JavaStream[T] = { - new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description)) + def groupBy[GROUP](fn: GroupByFunction[T, GROUP], + parallelism: Int, description: String): JavaStream[T] = { + new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + } + + def window(win: Window, description: String): JavaWindowStream[T] = { + new JavaWindowStream[T](stream.window(win, description)) } /** Add a low level Processor to process messages */ @@ -75,3 +79,11 @@ class JavaStream[T](val stream: Stream[T]) { new JavaStream[R](stream.process(processor, parallelism, conf, description)) } } + +class JavaWindowStream[T](stream: WindowStream[T]) { + + def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, + description: String): JavaStream[T] = { + new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala deleted file mode 100644 index 49d9dec..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.op - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.Task - -/** - * Operators for the DSL - */ -sealed trait Op { - def description: String - def conf: UserConfig -} - -/** - * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP - * "Attach" means running in same Actor. - */ -trait SlaveOp[T] extends Op - -case class FlatMapOp[T, R]( - fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty) - extends SlaveOp[T] - -case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty) - extends SlaveOp[T] - -trait MasterOp extends Op - -trait ParameterizedOp[T] extends MasterOp - -case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty) - extends MasterOp - -case class GroupByOp[T, R]( - fun: T => R, parallelism: Int, description: String, - override val conf: UserConfig = UserConfig.empty) - extends ParameterizedOp[T] - -case class ProcessorOp[T <: Task]( - processor: Class[T], parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -case class DataSourceOp[T]( - dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -case class DataSinkOp[T]( - dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -/** - * Contains operators which can be chained to single one. - * - * For example, flatmap().map().reduce() can be chained to single operator as - * no data shuffling is required. - * @param ops list of operations - */ -case class OpChain(ops: List[Op]) extends Op { - def head: Op = ops.head - def last: Op = ops.last - - def description: String = null - - override def conf: UserConfig = { - // The head's conf has priority - ops.reverse.foldLeft(UserConfig.empty) { (conf, op) => - conf.withConfig(op.conf) - } - } -} - -trait OpEdge - -/** - * The upstream OP and downstream OP doesn't require network data shuffle. - * - * For example, map, flatmap operation doesn't require network shuffle, we can use Direct - * to represent the relation with upstream operators. - */ -case object Direct extends OpEdge - -/** - * The upstream OP and downstream OP DOES require network data shuffle. - * - * For example, map, flatmap operation doesn't require network shuffle, we can use Direct - * to represent the relation with upstream operators. - */ -case object Shuffle extends OpEdge - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala new file mode 100644 index 0000000..2ec881b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.partitioner + +import org.apache.gearpump.Message +import org.apache.gearpump.partitioner.UnicastPartitioner +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn + +/** + * Partition messages by applying group by function first. + * + * For example: + * {{{ + * case class People(name: String, gender: String) + * + * object Test{ + * + * val groupBy: (People => String) = people => people.gender + * val partitioner = GroupByPartitioner(groupBy) + * } + * }}} + * + * @param fn First apply message with groupBy function, then pick the hashCode of the output + * to do the partitioning. You must define hashCode() for output type of groupBy function. + */ +class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group]) + extends UnicastPartitioner { + override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = { + val hashCode = fn.groupBy(message).hashCode() + (hashCode & Integer.MAX_VALUE) % partitionNum + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala deleted file mode 100644 index b2e2932..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.partitioner - -import org.apache.gearpump.Message -import org.apache.gearpump.partitioner.UnicastPartitioner - -/** - * Partition messages by applying group by function first. - * - * For example: - * {{{ - * case class People(name: String, gender: String) - * - * object Test{ - * - * val groupBy: (People => String) = people => people.gender - * val partitioner = GroupByPartitioner(groupBy) - * } - * }}} - * - * @param groupBy First apply message with groupBy function, then pick the hashCode of the output - * to do the partitioning. You must define hashCode() for output type of groupBy function. - */ -class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode() - (hashCode & Integer.MAX_VALUE) % partitionNum - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala new file mode 100644 index 0000000..744976b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.Processor.DefaultProcessor +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.{Constants, Processor} +import org.apache.gearpump.streaming.dsl.task.TransformTask +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} +import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} +import org.apache.gearpump.streaming.task.Task + +import scala.reflect.ClassTag + +/** + * This is a vertex on the logical plan. + */ +sealed trait Op { + + def description: String + + def userConfig: UserConfig + + def chain(op: Op)(implicit system: ActorSystem): Op + + def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] +} + +/** + * This represents a low level Processor. + */ +case class ProcessorOp[T <: Task]( + processor: Class[T], + parallelism: Int, + userConfig: UserConfig, + description: String) + extends Op { + + def this( + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "processor")(implicit classTag: ClassTag[T]) = { + this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description) + } + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + throw new OpChainException(this, other) + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + DefaultProcessor(parallelism, description, userConfig, processor) + } +} + +/** + * This represents a DataSource. + */ +case class DataSourceOp( + dataSource: DataSource, + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "source") + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + DataSourceOp(dataSource, parallelism, + userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn), + description) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[DataSourceTask[Any, Any]](parallelism, description, + userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) + } +} + +/** + * This represents a DataSink. + */ +case class DataSinkOp( + dataSink: DataSink, + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "sink") + extends Op { + + override def chain(op: Op)(implicit system: ActorSystem): Op = { + throw new OpChainException(this, op) + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + DataSinkProcessor(dataSink, parallelism, description) + } +} + +/** + * This represents operations that can be chained together + * (e.g. flatMap, map, filter, reduce) and further chained + * to another Op to be used + */ +case class ChainableOp[IN, OUT]( + fn: SingleInputFunction[IN, OUT]) extends Op { + + override def description: String = fn.description + + override def userConfig: UserConfig = UserConfig.empty + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[OUT, _] => + // TODO: preserve type info + ChainableOp(fn.andThen(op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor") + } +} + +/** + * This represents a Processor with window aggregation + */ +case class GroupByOp[IN, GROUP]( + groupByFn: GroupByFn[IN, GROUP], + parallelism: Int = 1, + description: String = "groupBy", + override val userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + GroupByOp(groupByFn, parallelism, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + groupByFn.getProcessor(parallelism, description, userConfig) + } +} + +/** + * This represents a Processor transforming merged streams + */ +case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[TransformTask[Any, Any]](1, description, userConfig) + } + +} + +/** + * This is an edge on the logical plan. + */ +trait OpEdge + +/** + * The upstream OP and downstream OP doesn't require network data shuffle. + * e.g. ChainableOp + */ +case object Direct extends OpEdge + +/** + * The upstream OP and downstream OP DOES require network data shuffle. + * e.g. GroupByOp + */ +case object Shuffle extends OpEdge + +/** + * Runtime exception thrown on chaining. + */ +class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala deleted file mode 100644 index 8de291c..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.plan - -import scala.collection.TraversableOnce -import akka.actor.ActorSystem -import org.slf4j.Logger -import org.apache.gearpump._ -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.op._ -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.sink.DataSinkProcessor -import org.apache.gearpump.streaming.source.DataSourceTask -import org.apache.gearpump.streaming.task.{Task, TaskContext} -import org.apache.gearpump.util.LogUtil - -/** - * Translates a OP to a TaskDescription - */ -class OpTranslator extends java.io.Serializable { - val LOG: Logger = LogUtil.getLogger(getClass) - - def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = { - - val baseConfig = ops.conf - - ops.ops.head match { - case op: MasterOp => - val tail = ops.ops.tail - val func = toFunction(tail) - val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) - - op match { - case DataSourceOp(dataSource, parallelism, conf, description) => - Processor[DataSourceTask[Any, Any]](parallelism, - description = description + "." + func.description, - userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) - case groupby@GroupByOp(_, parallelism, description, _) => - Processor[GroupByTask[Object, Object, Object]](parallelism, - description = description + "." + func.description, - userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby)) - case merge: MergeOp => - Processor[TransformTask[Object, Object]](1, - description = op.description + "." + func.description, - userConfig) - case ProcessorOp(processor, parallelism, conf, description) => - DefaultProcessor(parallelism, - description = description + "." + func.description, - userConfig, processor) - case DataSinkOp(dataSink, parallelism, conf, description) => - DataSinkProcessor(dataSink, parallelism, description + func.description) - } - case op: SlaveOp[_] => - val func = toFunction(ops.ops) - val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) - - Processor[TransformTask[Object, Object]](1, - description = func.description, - taskConf = userConfig) - case chain: OpChain => - throw new RuntimeException("Not supposed to be called!") - } - } - - private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = { - val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]() - val totalFunction = ops.foldLeft(func) { (fun, op) => - - val opFunction = op match { - case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] => - new FlatMapFunction(flatmap.fun, flatmap.description) - case reduce: ReduceOp[Object @unchecked] => - new ReduceFunction(reduce.fun, reduce.description) - case _ => - throw new RuntimeException("Not supposed to be called!") - } - fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]]) - } - totalFunction.asInstanceOf[SingleInputFunction[Object, Object]] - } -} - -object OpTranslator { - - trait SingleInputFunction[IN, OUT] extends Serializable { - def process(value: IN): TraversableOnce[OUT] - def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - new AndThen(this, other) - } - - def description: String - } - - class DummyInputFunction[T] extends SingleInputFunction[T, T] { - override def andThen[OUTER](other: SingleInputFunction[T, OUTER]) - : SingleInputFunction[T, OUTER] = { - other - } - - // Should never be called - override def process(value: T): TraversableOnce[T] = None - - override def description: String = "" - } - - class AndThen[IN, MIDDLE, OUT]( - first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) - extends SingleInputFunction[IN, OUT] { - - override def process(value: IN): TraversableOnce[OUT] = { - first.process(value).flatMap(second.process) - } - - override def description: String = { - Option(first.description).flatMap { description => - Option(second.description).map(description + "." + _) - }.orNull - } - } - - class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String) - extends SingleInputFunction[IN, OUT] { - - override def process(value: IN): TraversableOnce[OUT] = { - fun(value) - } - - override def description: String = { - this.descriptionMessage - } - } - - class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String) - extends SingleInputFunction[T, T] { - - private var state: Any = _ - - override def process(value: T): TraversableOnce[T] = { - if (state == null) { - state = value - } else { - state = fun(state.asInstanceOf[T], value) - } - Some(state.asInstanceOf[T]) - } - - override def description: String = descriptionMessage - } - - class GroupByTask[IN, GROUP, OUT]( - groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[GroupByOp[IN, GROUP]]( - GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun, - taskContext, userConf) - } - - private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]] - - override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - val group = groupBy(msg.msg.asInstanceOf[IN]) - if (!groups.contains(group)) { - val operator = - userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - groups += group -> operator - } - - val operator = groups(group) - - operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - } - } - } - - class TransformTask[IN, OUT]( - operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, - userConf: UserConfig) extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[SingleInputFunction[IN, OUT]]( - GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) - } - - override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - operator match { - case Some(op) => - op.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - } - case None => - taskContext.output(new Message(msg.msg, time)) - } - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index f5bbd65..16d5c06 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -22,7 +22,6 @@ import akka.actor.ActorSystem import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.streaming.dsl.op._ import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.Graph @@ -33,64 +32,60 @@ class Planner { * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low * level Graph API. */ - def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem) - : Graph[Processor[_ <: Task], _ <: Partitioner] = { + def plan(dag: Graph[Op, OpEdge]) + (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = { - val opTranslator = new OpTranslator() - - val newDag = optimize(dag) - newDag.mapEdge { (node1, edge, node2) => + val graph = optimize(dag) + graph.mapEdge { (node1, edge, node2) => edge match { case Shuffle => - node2.head match { - case groupBy: GroupByOp[Any @unchecked, Any @unchecked] => - new GroupByPartitioner(groupBy.fun) + node2 match { + case groupBy: GroupByOp[_, _] => + new GroupByPartitioner(groupBy.groupByFn) case _ => new HashPartitioner } case Direct => new CoLocationPartitioner } - }.mapVertex { opChain => - opTranslator.translate(opChain) - } + }.mapVertex(_.getProcessor) } - private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = { - val newGraph = dag.mapVertex(op => OpChain(List(op))) - - val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse + private def optimize(dag: Graph[Op, OpEdge]) + (implicit system: ActorSystem): Graph[Op, OpEdge] = { + val graph = dag.copy + val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse for (node <- nodes) { - val outGoingEdges = newGraph.outgoingEdgesOf(node) + val outGoingEdges = graph.outgoingEdgesOf(node) for (edge <- outGoingEdges) { - merge(newGraph, edge._1, edge._3) + merge(graph, edge._1, edge._3) } } - newGraph + graph } - private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain) - : Graph[OpChain, OpEdge] = { - if (dag.outDegreeOf(node1) == 1 && - dag.inDegreeOf(node2) == 1 && + private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op) + (implicit system: ActorSystem): Unit = { + if (graph.outDegreeOf(node1) == 1 && + graph.inDegreeOf(node2) == 1 && // For processor node, we don't allow it to merge with downstream operators - !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) { - val (_, edge, _) = dag.outgoingEdgesOf(node1).head + !node1.isInstanceOf[ProcessorOp[_ <: Task]] && + !node2.isInstanceOf[ProcessorOp[_ <: Task]]) { + val (_, edge, _) = graph.outgoingEdgesOf(node1).head if (edge == Direct) { - val opList = OpChain(node1.ops ++ node2.ops) - dag.addVertex(opList) - for (incomingEdge <- dag.incomingEdgesOf(node1)) { - dag.addEdge(incomingEdge._1, incomingEdge._2, opList) + val chainedOp = node1.chain(node2) + graph.addVertex(chainedOp) + for (incomingEdge <- graph.incomingEdgesOf(node1)) { + graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp) } - for (outgoingEdge <- dag.outgoingEdgesOf(node2)) { - dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3) + for (outgoingEdge <- graph.outgoingEdgesOf(node2)) { + graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3) } // Remove the old vertex - dag.removeVertex(node1) - dag.removeVertex(node2) + graph.removeVertex(node1) + graph.removeVertex(node2) } } - dag } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala new file mode 100644 index 0000000..609fbb0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan.functions + +trait SingleInputFunction[IN, OUT] extends Serializable { + def process(value: IN): TraversableOnce[OUT] + def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { + new AndThen(this, other) + } + def finish(): TraversableOnce[OUT] = None + def clearState(): Unit = {} + def description: String +} + +class AndThen[IN, MIDDLE, OUT]( + first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) + extends SingleInputFunction[IN, OUT] { + + override def process(value: IN): TraversableOnce[OUT] = { + first.process(value).flatMap(second.process) + } + + override def finish(): TraversableOnce[OUT] = { + val firstResult = first.finish().flatMap(second.process) + if (firstResult.isEmpty) { + second.finish() + } else { + firstResult + } + } + + override def clearState(): Unit = { + first.clearState() + second.clearState() + } + + override def description: String = { + Option(first.description).flatMap { description => + Option(second.description).map(description + "." + _) + }.orNull + } +} + +class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String) + extends SingleInputFunction[IN, OUT] { + + override def process(value: IN): TraversableOnce[OUT] = { + fn(value) + } + + override def description: String = descriptionMessage +} + + +class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) + extends SingleInputFunction[T, T] { + + private var state: Option[T] = None + + override def process(value: T): TraversableOnce[T] = { + if (state.isEmpty) { + state = Option(value) + } else { + state = state.map(fn(_, value)) + } + None + } + + override def finish(): TraversableOnce[T] = { + state + } + + override def clearState(): Unit = { + state = None + } + + override def description: String = descriptionMessage +} + +class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { + + override def process(value: T): TraversableOnce[Unit] = { + emit(value) + None + } + + override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = { + throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction") + } + + override def description: String = "" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala new file mode 100644 index 0000000..4ee2fa8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * This task triggers output on number of messages in a window. + */ +class CountTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFn].size + private var num = 0 + + override def onNext(msg: Message): Unit = { + windowRunner.process(msg) + num += 1 + if (windowSize == num) { + windowRunner.trigger(Instant.ofEpochMilli(windowSize)) + num = 0 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala new file mode 100644 index 0000000..4b7649f --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * This task triggers output on watermark progress. + */ +class EventTimeTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + override def onNext(message: Message): Unit = { + windowRunner.process(message) + } + + override def onWatermarkProgress(watermark: Instant): Unit = { + windowRunner.trigger(watermark) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala new file mode 100644 index 0000000..980a54b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import akka.actor.Actor.Receive +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering +import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +import scala.concurrent.duration.FiniteDuration + +object ProcessingTimeTriggerTask { + case object Triggering +} + +/** + * This task triggers output on scheduled system time interval. + */ +class ProcessingTimeTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn] + private val windowSizeMs = windowFn.size.toMillis + private val windowStepMs = windowFn.step.toMillis + + override def onStart(startTime: Instant): Unit = { + val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs + taskContext.scheduleOnce( + new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering) + } + + override def onNext(message: Message): Unit = { + windowRunner.process(message) + } + + override def receiveUnManagedMessage: Receive = { + case Triggering => + windowRunner.trigger(Instant.now) + taskContext.scheduleOnce( + new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala new file mode 100644 index 0000000..e35f085 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +class TransformTask[IN, OUT]( + operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, + userConf: UserConfig) extends Task(taskContext, userConf) { + + def this(taskContext: TaskContext, userConf: UserConfig) = { + this(userConf.getValue[SingleInputFunction[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) + } + + override def onNext(msg: Message): Unit = { + val time = msg.timestamp + + operator match { + case Some(op) => + op.process(msg.msg.asInstanceOf[IN]).foreach { msg => + taskContext.output(new Message(msg, time)) + } + case None => + taskContext.output(new Message(msg.msg, time)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala new file mode 100644 index 0000000..a4524a8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +sealed trait AccumulationMode + +case object Accumulating extends AccumulationMode + +case object Discarding extends AccumulationMode http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala new file mode 100644 index 0000000..30e68ba --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.task.Task + +/** + * Divides messages into groups according its payload and timestamp. + * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]] + * for default implementation. + */ +trait GroupByFn[T, GROUP] { + + /** + * Used by + * 1. GroupByPartitioner to shuffle messages + * 2. WindowRunner to group messages for time-based aggregation + */ + def groupBy(message: Message): GROUP + + /** + * Returns a Processor according to window trigger during planning + */ + def getProcessor(parallelism: Int, description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala new file mode 100644 index 0000000..9865e18 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +sealed trait Trigger + +case object EventTimeTrigger extends Trigger + +case object ProcessingTimeTrigger extends Trigger + +case object CountTrigger extends Trigger + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala new file mode 100644 index 0000000..4b94879 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import java.time.Duration + +/** + * + * @param windowFn + * @param trigger + * @param accumulationMode + */ +case class Window( + windowFn: WindowFn, + trigger: Trigger = EventTimeTrigger, + accumulationMode: AccumulationMode = Discarding) { + + def triggering(trigger: Trigger): Window = { + Window(windowFn, trigger) + } + + def accumulating: Window = { + Window(windowFn, trigger, Accumulating) + } + + def discarding: Window = { + Window(windowFn, trigger, Discarding) + } +} + +object CountWindow { + + def apply(size: Int): Window = { + Window(CountWindowFn(size), CountTrigger) + } +} + +object FixedWindow { + + /** + * Defines a FixedWindow. + * @param size window size + * @return a Window definition + */ + def apply(size: Duration): Window = { + Window(SlidingWindowFn(size, size)) + } +} + +object SlidingWindow { + + /** + * Defines a SlidingWindow + * @param size window size + * @param step window step to slide forward + * @return a Window definition + */ + def apply(size: Duration, step: Duration): Window = { + Window(SlidingWindowFn(size, step)) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala new file mode 100644 index 0000000..0768730 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import java.time.{Duration, Instant} + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.dsl.window.impl.Bucket + +import scala.collection.mutable.ArrayBuffer + +sealed trait WindowFn { + def apply(timestamp: Instant): List[Bucket] +} + +case class SlidingWindowFn(size: Duration, step: Duration) + extends WindowFn { + + def this(size: Duration) = { + this(size, size) + } + + override def apply(timestamp: Instant): List[Bucket] = { + val sizeMillis = size.toMillis + val stepMillis = step.toMillis + val timeMillis = timestamp.toEpochMilli + val windows = ArrayBuffer.empty[Bucket] + var start = lastStartFor(timeMillis, stepMillis) + windows += Bucket.ofEpochMilli(start, start + sizeMillis) + start -= stepMillis + while (start >= timeMillis) { + windows += Bucket.ofEpochMilli(start, start + sizeMillis) + start -= stepMillis + } + windows.toList + } + + private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = { + timestamp - (timestamp + windowStep) % windowStep + } +} + +case class CountWindowFn(size: Int) extends WindowFn { + + override def apply(timestamp: Instant): List[Bucket] = { + List(Bucket.ofEpochMilli(0, size)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala new file mode 100644 index 0000000..e978983 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.dsl.window.api.Trigger + +trait ReduceFnRunner { + + def process(message: Message): Unit + + def onTrigger(trigger: Trigger): Unit + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala new file mode 100644 index 0000000..53cf5d0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask} +import org.apache.gearpump.streaming.task.Task + +object Bucket { + def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = { + Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)) + } +} + +/** + * A window unit including startTime and excluding endTime. + */ +case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucket] { + override def compareTo(o: Bucket): Int = { + val ret = startTime.compareTo(o.startTime) + if (ret != 0) { + ret + } else { + endTime.compareTo(o.endTime) + } + } +} + +case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window) + extends GroupByFn[T, (GROUP, List[Bucket])] { + + override def groupBy(message: Message): (GROUP, List[Bucket]) = { + val group = groupByFn(message.msg.asInstanceOf[T]) + val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp)) + group -> buckets + } + + override def getProcessor(parallelism: Int, description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { + val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this) + window.trigger match { + case CountTrigger => + Processor[CountTriggerTask[T, GROUP]](parallelism, description, config) + case ProcessingTimeTrigger => + Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config) + case EventTimeTrigger => + Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config) + } + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala new file mode 100644 index 0000000..9af5e61 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.gs.collections.api.block.procedure.Procedure +import org.apache.gearpump.gs.collections.impl.list.mutable.FastList +import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap +import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.window.api.Discarding +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +trait WindowRunner { + + def process(message: Message): Unit + + def trigger(time: Instant): Unit + +} + +object DefaultWindowRunner { + + private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) + + case class WindowGroup[GROUP](bucket: Bucket, group: GROUP) + extends Comparable[WindowGroup[GROUP]] { + override def compareTo(o: WindowGroup[GROUP]): Int = { + val ret = bucket.compareTo(o.bucket) + if (ret != 0) { + ret + } else if (group.equals(o.group)) { + 0 + } else { + -1 + } + } + } +} + +class DefaultWindowRunner[IN, GROUP, OUT]( + taskContext: TaskContext, userConfig: UserConfig, + groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem) + extends WindowRunner { + import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._ + + private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]] + private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]] + + + override def process(message: Message): Unit = { + val (group, buckets) = groupBy.groupBy(message) + buckets.foreach { bucket => + val wg = WindowGroup(bucket, group) + val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1)) + inputs.add(message.msg.asInstanceOf[IN]) + windowGroups.put(wg, inputs) + } + groupFns.putIfAbsent(group, + userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get) + } + + override def trigger(time: Instant): Unit = { + onTrigger() + + @annotation.tailrec + def onTrigger(): Unit = { + if (windowGroups.notEmpty()) { + val first = windowGroups.firstKey + if (!time.isBefore(first.bucket.endTime)) { + val inputs = windowGroups.remove(first) + val reduceFn = groupFns.get(first.group) + .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) + inputs.forEach(new Procedure[IN] { + override def value(t: IN): Unit = { + reduceFn.process(t) + } + }) + reduceFn.finish() + if (groupBy.window.accumulationMode == Discarding) { + reduceFn.clearState() + } + onTrigger() + } + } + } + + def emitResult(result: OUT, time: Instant): Unit = { + taskContext.output(Message(result, time.toEpochMilli)) + } + } +}
