Repository: incubator-gearpump Updated Branches: refs/heads/master fc8006cea -> c1370d9bf
[GEARPUMP-315] Add GlobalWindows and implement groupBy on it Author: manuzhang <[email protected]> Closes #185 from manuzhang/global_window. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c1370d9b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c1370d9b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c1370d9b Branch: refs/heads/master Commit: c1370d9bf21b62c964d107a1f24765770116a316 Parents: fc8006c Author: manuzhang <[email protected]> Authored: Sat May 27 15:39:00 2017 +0800 Committer: manuzhang <[email protected]> Committed: Sat May 27 15:40:24 2017 +0800 ---------------------------------------------------------------------- .../apache/gearpump/streaming/dsl/scalaapi/Stream.scala | 2 +- .../streaming/dsl/window/api/WindowFunction.scala | 11 ++++++++++- .../gearpump/streaming/dsl/window/api/Windows.scala | 7 +++++++ .../gearpump/streaming/dsl/scalaapi/StreamSpec.scala | 4 ++-- 4 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index 82d6beb..e15d4ae 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -188,7 +188,7 @@ class Stream[T]( */ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy"): Stream[T] = { - window(CountWindows.apply(1).accumulating) + window(GlobalWindows()) .groupBy[GROUP](fn, parallelism, description) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala index 73fef5d..7da9c85 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala @@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.dsl.window.api import java.time.{Duration, Instant} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp} import org.apache.gearpump.streaming.dsl.window.impl.Window import scala.collection.mutable.ArrayBuffer @@ -44,6 +44,15 @@ abstract class NonMergingWindowFunction[T] extends WindowFunction[T] { override def isNonMerging: Boolean = true } +case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] { + + override def apply(context: WindowFunction.Context[T]): Array[Window] = { + Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS), + Instant.ofEpochMilli(MAX_TIME_MILLIS))) + } + +} + case class SlidingWindowFunction[T](size: Duration, step: Duration) extends NonMergingWindowFunction[T] { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala index 5917f09..467f57c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala @@ -52,6 +52,13 @@ object CountWindows { } } +object GlobalWindows { + + def apply[T](): Windows[T] = { + Windows(GlobalWindowFunction()) + } +} + object FixedWindows { /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c1370d9b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala index fb398b8..4c7e209 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join -import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} +import org.apache.gearpump.streaming.dsl.task.{EventTimeTriggerTask, TransformTask} import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.task.{Task, TaskContext} @@ -92,7 +92,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock private def getExpectedDagTopology: Graph[String, String] = { val source = classOf[DataSourceTask[_, _]].getName - val group = classOf[CountTriggerTask[_, _]].getName + val group = classOf[EventTimeTriggerTask[_, _]].getName val merge = classOf[TransformTask[_, _]].getName val join = classOf[Join].getName
