Repository: incubator-gearpump Updated Branches: refs/heads/master 5cabd8ca3 -> 1d15265dc
[GEARPUMP-23] Add SessionWindows Author: manuzhang <[email protected]> Closes #140 from manuzhang/sessions. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1d15265d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1d15265d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1d15265d Branch: refs/heads/master Commit: 1d15265dc3fe124f6b231c14567bacf0915c91af Parents: 5cabd8c Author: manuzhang <[email protected]> Authored: Tue Feb 7 21:21:42 2017 +0800 Committer: manuzhang <[email protected]> Committed: Tue Feb 7 21:21:52 2017 +0800 ---------------------------------------------------------------------- .../dsl/window/api/WindowFunction.scala | 22 ++++++++++-- .../streaming/dsl/window/api/Windows.scala | 27 ++++++++++++--- .../streaming/dsl/window/impl/Window.scala | 24 +++++++++++-- .../dsl/window/impl/WindowRunner.scala | 36 ++++++++++++++++---- .../dsl/task/EventTimeTriggerTaskSpec.scala | 4 +-- .../task/ProcessingTimeTriggerTaskSpec.scala | 4 +-- 6 files changed, 97 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala index 9ef171d..73fef5d 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala @@ -33,11 +33,19 @@ object WindowFunction { } trait WindowFunction[T] { + def apply(context: WindowFunction.Context[T]): Array[Window] + + def isNonMerging: Boolean +} + +abstract class NonMergingWindowFunction[T] extends WindowFunction[T] { + + override def isNonMerging: Boolean = true } case class SlidingWindowFunction[T](size: Duration, step: Duration) - extends WindowFunction[T] { + extends NonMergingWindowFunction[T] { def this(size: Duration) = { this(size, size) @@ -64,9 +72,19 @@ case class SlidingWindowFunction[T](size: Duration, step: Duration) } } -case class CountWindowFunction[T](size: Int) extends WindowFunction[T] { +case class CountWindowFunction[T](size: Int) extends NonMergingWindowFunction[T] { override def apply(context: WindowFunction.Context[T]): Array[Window] = { Array(Window.ofEpochMilli(0, size)) } } + +case class SessionWindowFunction[T](gap: Duration) extends WindowFunction[T] { + + override def apply(context: WindowFunction.Context[T]): Array[Window] = { + Array(Window(context.timestamp, context.timestamp.plus(gap))) + } + + override def isNonMerging: Boolean = false + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala index c636a55..5917f09 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala @@ -21,9 +21,11 @@ import java.time.Duration /** * - * @param windowFn - * @param trigger - * @param accumulationMode + * Defines how to apply window functions. + * + * @param windowFn how to divide windows + * @param trigger when to trigger window result + * @param accumulationMode whether to accumulate results across windows */ case class Windows[T]( windowFn: WindowFunction[T], @@ -54,6 +56,7 @@ object FixedWindows { /** * Defines a FixedWindow. + * * @param size window size * @return a Window definition */ @@ -62,10 +65,11 @@ object FixedWindows { } } -object SlidingWindow { +object SlidingWindows { /** - * Defines a SlidingWindow + * Defines a SlidingWindow. + * * @param size window size * @param step window step to slide forward * @return a Window definition @@ -75,3 +79,16 @@ object SlidingWindow { } } +object SessionWindows { + + /** + * Defines a SessionWindow. + * + * @param gap session gap + * @return a Window definition + */ + def apply[T](gap: Duration): Windows[T] = { + Windows(SessionWindowFunction(gap)) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index fe644af..3e9d8fb 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -39,6 +39,22 @@ object Window { */ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] { + /** + * Returns whether this window intersects the given window. + */ + def intersects(other: Window): Boolean = { + startTime.isBefore(other.endTime) && endTime.isAfter(other.startTime) + } + + /** + * Returns the minimal window that includes both this window and + * the given window. + */ + def span(other: Window): Window = { + Window(Instant.ofEpochMilli(Math.min(startTime.toEpochMilli, other.startTime.toEpochMilli)), + Instant.ofEpochMilli(Math.max(endTime.toEpochMilli, other.endTime.toEpochMilli))) + } + override def compareTo(o: Window): Int = { val ret = startTime.compareTo(o.startTime) if (ret != 0) { @@ -52,14 +68,16 @@ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Windo case class WindowAndGroup[GROUP](window: Window, group: GROUP) extends Comparable[WindowAndGroup[GROUP]] { + def intersects(other: WindowAndGroup[GROUP]): Boolean = { + window.intersects(other.window) && group.equals(other.group) + } + override def compareTo(o: WindowAndGroup[GROUP]): Int = { val ret = window.compareTo(o.window) if (ret != 0) { ret - } else if (group.equals(o.group)) { - 0 } else { - -1 + group.hashCode() - o.group.hashCode() } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 7a16100..7013645 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -20,6 +20,7 @@ package org.apache.gearpump.streaming.dsl.window.impl import java.time.Instant import akka.actor.ActorSystem +import com.gs.collections.api.block.predicate.Predicate import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import com.gs.collections.api.block.procedure.Procedure @@ -33,6 +34,7 @@ import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.LogUtil import org.slf4j.Logger + trait WindowRunner { def process(message: Message): Unit @@ -43,8 +45,6 @@ trait WindowRunner { object DefaultWindowRunner { private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) - - case class InputsAndFn[IN, OUT](inputs: FastList[IN], fn: FunctionRunner[IN, OUT]) } class DefaultWindowRunner[IN, GROUP, OUT]( @@ -52,17 +52,24 @@ class DefaultWindowRunner[IN, GROUP, OUT]( groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem) extends WindowRunner { + private val windowFn = groupBy.window.windowFn private val groupedInputs = new TreeSortedMap[WindowAndGroup[GROUP], FastList[IN]] private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]] override def process(message: Message): Unit = { + val input = message.msg.asInstanceOf[IN] val wgs = groupBy.groupBy(message) wgs.foreach { wg => - if (!groupedInputs.containsKey(wg)) { - val inputs = new FastList[IN](1) - groupedInputs.put(wg, inputs) + if (windowFn.isNonMerging) { + if (!groupedInputs.containsKey(wg)) { + val inputs = new FastList[IN](1) + groupedInputs.put(wg, inputs) + } + groupedInputs.get(wg).add(input) + } else { + merge(wg, input) } - groupedInputs.get(wg).add(message.msg.asInstanceOf[IN]) + if (!groupedFnRunners.containsKey(wg.group)) { val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get fn.setup() @@ -70,6 +77,23 @@ class DefaultWindowRunner[IN, GROUP, OUT]( } } + def merge(wg: WindowAndGroup[GROUP], input: IN): Unit = { + val intersected = groupedInputs.keySet.select(new Predicate[WindowAndGroup[GROUP]] { + override def accept(each: WindowAndGroup[GROUP]): Boolean = { + wg.intersects(each) + } + }) + var mergedWin = wg.window + val mergedInputs = FastList.newListWith(input) + intersected.forEach(new Procedure[WindowAndGroup[GROUP]] { + override def value(each: WindowAndGroup[GROUP]): Unit = { + mergedWin = mergedWin.span(each.window) + mergedInputs.addAll(groupedInputs.remove(each)) + } + }) + groupedInputs.put(WindowAndGroup(mergedWin, wg.group), mergedInputs) + } + } override def trigger(time: Instant): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala index 07b5544..9414c76 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala @@ -22,7 +22,7 @@ import java.time.{Duration, Instant} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindow} +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindows} import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -43,7 +43,7 @@ class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks forAll(windowSizeGen, windowStepGen, watermarkGen) { (windowSize: Long, windowStep: Long, watermark: Instant) => - val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize), + val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize), Duration.ofMillis(windowStep)).triggering(EventTimeTrigger) val groupBy = mock[GroupAlsoByWindow[Any, Any]] val windowRunner = mock[WindowRunner] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala index ef51ab2..cbc9e0c 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala @@ -23,7 +23,7 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering -import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindow} +import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindows} import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -44,7 +44,7 @@ class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks forAll(windowSizeGen, windowStepGen, startTimeGen) { (windowSize: Long, windowStep: Long, startTime: Instant) => - val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize), + val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize), Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger) val groupBy = mock[GroupAlsoByWindow[Any, Any]] val windowRunner = mock[WindowRunner]
