Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 5cabd8ca3 -> 1d15265dc


[GEARPUMP-23] Add SessionWindows

Author: manuzhang <[email protected]>

Closes #140 from manuzhang/sessions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1d15265d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1d15265d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1d15265d

Branch: refs/heads/master
Commit: 1d15265dc3fe124f6b231c14567bacf0915c91af
Parents: 5cabd8c
Author: manuzhang <[email protected]>
Authored: Tue Feb 7 21:21:42 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Feb 7 21:21:52 2017 +0800

----------------------------------------------------------------------
 .../dsl/window/api/WindowFunction.scala         | 22 ++++++++++--
 .../streaming/dsl/window/api/Windows.scala      | 27 ++++++++++++---
 .../streaming/dsl/window/impl/Window.scala      | 24 +++++++++++--
 .../dsl/window/impl/WindowRunner.scala          | 36 ++++++++++++++++----
 .../dsl/task/EventTimeTriggerTaskSpec.scala     |  4 +--
 .../task/ProcessingTimeTriggerTaskSpec.scala    |  4 +--
 6 files changed, 97 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index 9ef171d..73fef5d 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -33,11 +33,19 @@ object WindowFunction {
 }
 
 trait WindowFunction[T] {
+
   def apply(context: WindowFunction.Context[T]): Array[Window]
+
+  def isNonMerging: Boolean
+}
+
+abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
+
+  override def isNonMerging: Boolean = true
 }
 
 case class SlidingWindowFunction[T](size: Duration, step: Duration)
-  extends WindowFunction[T] {
+  extends NonMergingWindowFunction[T] {
 
   def this(size: Duration) = {
     this(size, size)
@@ -64,9 +72,19 @@ case class SlidingWindowFunction[T](size: Duration, step: 
Duration)
   }
 }
 
-case class CountWindowFunction[T](size: Int) extends WindowFunction[T] {
+case class CountWindowFunction[T](size: Int) extends 
NonMergingWindowFunction[T] {
 
   override def apply(context: WindowFunction.Context[T]): Array[Window] = {
     Array(Window.ofEpochMilli(0, size))
   }
 }
+
+case class SessionWindowFunction[T](gap: Duration) extends WindowFunction[T] {
+
+  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+    Array(Window(context.timestamp, context.timestamp.plus(gap)))
+  }
+
+  override def isNonMerging: Boolean = false
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index c636a55..5917f09 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -21,9 +21,11 @@ import java.time.Duration
 
 /**
  *
- * @param windowFn
- * @param trigger
- * @param accumulationMode
+ * Defines how to apply window functions.
+ *
+ * @param windowFn how to divide windows
+ * @param trigger when to trigger window result
+ * @param accumulationMode whether to accumulate results across windows
  */
 case class Windows[T](
     windowFn: WindowFunction[T],
@@ -54,6 +56,7 @@ object FixedWindows {
 
   /**
    * Defines a FixedWindow.
+   *
    * @param size window size
    * @return a Window definition
    */
@@ -62,10 +65,11 @@ object FixedWindows {
   }
 }
 
-object SlidingWindow {
+object SlidingWindows {
 
   /**
-   * Defines a SlidingWindow
+   * Defines a SlidingWindow.
+   *
    * @param size window size
    * @param step window step to slide forward
    * @return a Window definition
@@ -75,3 +79,16 @@ object SlidingWindow {
   }
 }
 
+object SessionWindows {
+
+  /**
+   * Defines a SessionWindow.
+   *
+   * @param gap session gap
+   * @return a Window definition
+   */
+  def apply[T](gap: Duration): Windows[T] = {
+    Windows(SessionWindowFunction(gap))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index fe644af..3e9d8fb 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -39,6 +39,22 @@ object Window {
  */
 case class Window(startTime: Instant, endTime: Instant) extends 
Comparable[Window] {
 
+  /**
+   * Returns whether this window intersects the given window.
+   */
+  def intersects(other: Window): Boolean = {
+    startTime.isBefore(other.endTime) && endTime.isAfter(other.startTime)
+  }
+
+  /**
+   * Returns the minimal window that includes both this window and
+   * the given window.
+   */
+  def span(other: Window): Window = {
+    Window(Instant.ofEpochMilli(Math.min(startTime.toEpochMilli, 
other.startTime.toEpochMilli)),
+      Instant.ofEpochMilli(Math.max(endTime.toEpochMilli, 
other.endTime.toEpochMilli)))
+  }
+
   override def compareTo(o: Window): Int = {
     val ret = startTime.compareTo(o.startTime)
     if (ret != 0) {
@@ -52,14 +68,16 @@ case class Window(startTime: Instant, endTime: Instant) 
extends Comparable[Windo
 case class WindowAndGroup[GROUP](window: Window, group: GROUP)
   extends Comparable[WindowAndGroup[GROUP]] {
 
+  def intersects(other: WindowAndGroup[GROUP]): Boolean = {
+    window.intersects(other.window) && group.equals(other.group)
+  }
+
   override def compareTo(o: WindowAndGroup[GROUP]): Int = {
     val ret = window.compareTo(o.window)
     if (ret != 0) {
       ret
-    } else if (group.equals(o.group)) {
-      0
     } else {
-      -1
+      group.hashCode() - o.group.hashCode()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 7a16100..7013645 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -20,6 +20,7 @@ package org.apache.gearpump.streaming.dsl.window.impl
 import java.time.Instant
 
 import akka.actor.ActorSystem
+import com.gs.collections.api.block.predicate.Predicate
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import com.gs.collections.api.block.procedure.Procedure
@@ -33,6 +34,7 @@ import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.LogUtil
 import org.slf4j.Logger
 
+
 trait WindowRunner {
 
   def process(message: Message): Unit
@@ -43,8 +45,6 @@ trait WindowRunner {
 object DefaultWindowRunner {
 
   private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, 
_, _]])
-
-  case class InputsAndFn[IN, OUT](inputs: FastList[IN], fn: FunctionRunner[IN, 
OUT])
 }
 
 class DefaultWindowRunner[IN, GROUP, OUT](
@@ -52,17 +52,24 @@ class DefaultWindowRunner[IN, GROUP, OUT](
     groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
   extends WindowRunner {
 
+  private val windowFn = groupBy.window.windowFn
   private val groupedInputs = new TreeSortedMap[WindowAndGroup[GROUP], 
FastList[IN]]
   private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
 
   override def process(message: Message): Unit = {
+    val input = message.msg.asInstanceOf[IN]
     val wgs = groupBy.groupBy(message)
     wgs.foreach { wg =>
-      if (!groupedInputs.containsKey(wg)) {
-        val inputs = new FastList[IN](1)
-        groupedInputs.put(wg, inputs)
+      if (windowFn.isNonMerging) {
+        if (!groupedInputs.containsKey(wg)) {
+          val inputs = new FastList[IN](1)
+          groupedInputs.put(wg, inputs)
+        }
+        groupedInputs.get(wg).add(input)
+      } else {
+        merge(wg, input)
       }
-      groupedInputs.get(wg).add(message.msg.asInstanceOf[IN])
+
       if (!groupedFnRunners.containsKey(wg.group)) {
         val fn = userConfig.getValue[FunctionRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
         fn.setup()
@@ -70,6 +77,23 @@ class DefaultWindowRunner[IN, GROUP, OUT](
       }
     }
 
+    def merge(wg: WindowAndGroup[GROUP], input: IN): Unit = {
+      val intersected = groupedInputs.keySet.select(new 
Predicate[WindowAndGroup[GROUP]] {
+        override def accept(each: WindowAndGroup[GROUP]): Boolean = {
+          wg.intersects(each)
+        }
+      })
+      var mergedWin = wg.window
+      val mergedInputs = FastList.newListWith(input)
+      intersected.forEach(new Procedure[WindowAndGroup[GROUP]] {
+        override def value(each: WindowAndGroup[GROUP]): Unit = {
+          mergedWin = mergedWin.span(each.window)
+          mergedInputs.addAll(groupedInputs.remove(each))
+        }
+      })
+      groupedInputs.put(WindowAndGroup(mergedWin, wg.group), mergedInputs)
+    }
+
   }
 
   override def trigger(time: Instant): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
index 07b5544..9414c76 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
@@ -22,7 +22,7 @@ import java.time.{Duration, Instant}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
SlidingWindows}
 import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -43,7 +43,7 @@ class EventTimeTriggerTaskSpec extends PropSpec with 
PropertyChecks
     forAll(windowSizeGen, windowStepGen, watermarkGen) {
       (windowSize: Long, windowStep: Long, watermark: Instant) =>
 
-        val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize),
+        val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
           Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
         val groupBy = mock[GroupAlsoByWindow[Any, Any]]
         val windowRunner = mock[WindowRunner]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1d15265d/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
index ef51ab2..cbc9e0c 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
@@ -23,7 +23,7 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import 
org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, 
SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, 
SlidingWindows}
 import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -44,7 +44,7 @@ class ProcessingTimeTriggerTaskSpec extends PropSpec with 
PropertyChecks
     forAll(windowSizeGen, windowStepGen, startTimeGen) {
       (windowSize: Long, windowStep: Long, startTime: Instant) =>
 
-        val window = SlidingWindow.apply[Any](Duration.ofMillis(windowSize),
+        val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
           Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
         val groupBy = mock[GroupAlsoByWindow[Any, Any]]
         val windowRunner = mock[WindowRunner]

Reply via email to