Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 6677b6a11 -> 215531cd8


[GEARPUMP-245] Apply groupBy and fix group implementations

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [x] Make sure tests pass via `sbt clean test`.
 - [x] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality.

Author: manuzhang <[email protected]>

Closes #122 from manuzhang/GEARPUMP-245.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/215531cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/215531cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/215531cd

Branch: refs/heads/master
Commit: 215531cd822f5a07dd6925220f962a3b64d4aebd
Parents: 6677b6a
Author: manuzhang <[email protected]>
Authored: Wed Dec 14 12:45:48 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Dec 14 12:45:48 2016 +0800

----------------------------------------------------------------------
 .../streaming/dsl/javaapi/JavaStream.scala      |  2 +-
 .../dsl/window/impl/WindowRunner.scala          | 52 ++++++++++----------
 2 files changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/215531cd/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index f68731e..f2654ea 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -84,6 +84,6 @@ class JavaWindowStream[T](stream: WindowStream[T]) {
 
   def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
       description: String): JavaStream[T] = {
-    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+    new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/215531cd/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 640d090..b3ecf2d 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -26,6 +26,7 @@ import com.gs.collections.api.block.procedure.Procedure
 import com.gs.collections.impl.list.mutable.FastList
 import com.gs.collections.impl.map.mutable.UnifiedMap
 import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import com.gs.collections.impl.set.mutable.UnifiedSet
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, 
SingleInputFunction}
 import org.apache.gearpump.streaming.dsl.window.api.Discarding
@@ -46,18 +47,6 @@ object DefaultWindowRunner {
   private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, 
_, _]])
 
   case class WindowGroup[GROUP](bucket: Bucket, group: GROUP)
-    extends Comparable[WindowGroup[GROUP]] {
-    override def compareTo(o: WindowGroup[GROUP]): Int = {
-      val ret = bucket.compareTo(o.bucket)
-      if (ret != 0) {
-        ret
-      } else if (group.equals(o.group)) {
-        0
-      } else {
-        -1
-      }
-    }
-  }
 }
 
 class DefaultWindowRunner[IN, GROUP, OUT](
@@ -66,7 +55,8 @@ class DefaultWindowRunner[IN, GROUP, OUT](
   extends WindowRunner {
   import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._
 
-  private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], 
FastList[IN]]
+  private val windows = new TreeSortedMap[Bucket, 
UnifiedSet[WindowGroup[GROUP]]]
+  private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]]
   private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
 
 
@@ -74,6 +64,10 @@ class DefaultWindowRunner[IN, GROUP, OUT](
     val (group, buckets) = groupBy.groupBy(message)
     buckets.foreach { bucket =>
       val wg = WindowGroup(bucket, group)
+      val wgs = windows.getOrDefault(bucket, new 
UnifiedSet[WindowGroup[GROUP]](1))
+      wgs.add(wg)
+      windows.put(bucket, wgs)
+
       val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1))
       inputs.add(message.msg.asInstanceOf[IN])
       windowGroups.put(wg, inputs)
@@ -87,21 +81,27 @@ class DefaultWindowRunner[IN, GROUP, OUT](
 
     @annotation.tailrec
     def onTrigger(): Unit = {
-      if (windowGroups.notEmpty()) {
-        val first = windowGroups.firstKey
-        if (!time.isBefore(first.bucket.endTime)) {
-          val inputs = windowGroups.remove(first)
-          val reduceFn = groupFns.get(first.group)
-            .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
-          inputs.forEach(new Procedure[IN] {
-            override def value(t: IN): Unit = {
-              reduceFn.process(t)
+      if (windows.notEmpty()) {
+        val first = windows.firstKey
+        if (!time.isBefore(first.endTime)) {
+          val wgs = windows.remove(first)
+          wgs.forEach(new Procedure[WindowGroup[GROUP]] {
+            override def value(each: WindowGroup[GROUP]): Unit = {
+              val inputs = windowGroups.remove(each)
+              val reduceFn = groupFns.get(each.group)
+                .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+              inputs.forEach(new Procedure[IN] {
+                override def value(t: IN): Unit = {
+                  reduceFn.process(t)
+                }
+              })
+              reduceFn.finish()
+              if (groupBy.window.accumulationMode == Discarding) {
+                reduceFn.clearState()
+              }
             }
           })
-          reduceFn.finish()
-          if (groupBy.window.accumulationMode == Discarding) {
-            reduceFn.clearState()
-          }
+
           onTrigger()
         }
       }

Reply via email to