Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 5d524918d -> 5cabd8ca3


[GEARPUMP-23] Do not group by windows in GroupByPartitioner

Author: manuzhang <[email protected]>

Closes #139 from manuzhang/group_by.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5cabd8ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5cabd8ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5cabd8ca

Branch: refs/heads/master
Commit: 5cabd8ca3406de65de74a59c0e57147f00b9edc3
Parents: 5d52491
Author: manuzhang <[email protected]>
Authored: Tue Feb 7 20:32:25 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Feb 7 20:33:19 2017 +0800

----------------------------------------------------------------------
 .../dsl/partitioner/GroupByPartitioner.scala    |  7 ++-
 .../apache/gearpump/streaming/dsl/plan/OP.scala |  8 ++--
 .../gearpump/streaming/dsl/plan/Planner.scala   |  4 +-
 .../streaming/dsl/scalaapi/Stream.scala         |  6 +--
 .../streaming/dsl/window/api/GroupByFn.scala    | 47 --------------------
 .../streaming/dsl/window/impl/Window.scala      |  7 ++-
 .../partitioner/GroupByPartitionerSpec.scala    | 13 +-----
 .../gearpump/streaming/dsl/plan/OpSpec.scala    | 12 ++---
 .../streaming/dsl/plan/PlannerSpec.scala        | 19 ++------
 9 files changed, 27 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
index efa7409..7e1214e 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -20,7 +20,6 @@ package org.apache.gearpump.streaming.dsl.partitioner
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.streaming.partitioner.UnicastPartitioner
-import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 
 /**
  * Partition messages by applying group by function first.
@@ -39,10 +38,10 @@ import 
org.apache.gearpump.streaming.dsl.window.api.GroupByFn
  * @param fn First apply message with groupBy function, then pick the hashCode 
of the output
  *   to do the partitioning. You must define hashCode() for output type of 
groupBy function.
  */
-class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group])
-  extends UnicastPartitioner {
+class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner {
+
   override def getPartition(message: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
-    val hashCode = fn.groupBy(message).hashCode()
+    val hashCode = fn(message.msg.asInstanceOf[T]).hashCode()
     (hashCode & Integer.MAX_VALUE) % partitionNum
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 56f16e1..708e0d2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -25,7 +25,7 @@ import 
org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, 
FunctionRunner}
 import org.apache.gearpump.streaming.{Constants, Processor}
 import org.apache.gearpump.streaming.dsl.task.TransformTask
-import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
 import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
 import org.apache.gearpump.streaming.task.Task
@@ -149,7 +149,7 @@ case class ChainableOp[IN, OUT](
  * This represents a Processor with window aggregation
  */
 case class GroupByOp[IN, GROUP](
-    groupByFn: GroupByFn[IN, GROUP],
+    groupBy: GroupAlsoByWindow[IN, GROUP],
     parallelism: Int = 1,
     description: String = "groupBy",
     override val userConfig: UserConfig = UserConfig.empty)
@@ -158,7 +158,7 @@ case class GroupByOp[IN, GROUP](
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
       case op: ChainableOp[_, _] =>
-        GroupByOp(groupByFn, parallelism, description,
+        GroupByOp(groupBy, parallelism, description,
           userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
       case _ =>
         throw new OpChainException(this, other)
@@ -166,7 +166,7 @@ case class GroupByOp[IN, GROUP](
   }
 
   override def getProcessor(implicit system: ActorSystem): Processor[_ <: 
Task] = {
-    groupByFn.getProcessor(parallelism, description, userConfig)
+    groupBy.getProcessor(parallelism, description, userConfig)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 65f9cd2..1dd8026 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -40,8 +40,8 @@ class Planner {
       edge match {
         case Shuffle =>
           node2 match {
-            case groupBy: GroupByOp[_, _] =>
-              new GroupByPartitioner(groupBy.groupByFn)
+            case op: GroupByOp[_, _] =>
+              new GroupByPartitioner(op.groupBy.groupByFn)
             case _ => new HashPartitioner
           }
         case Direct =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index bdb245c..f71276b 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -25,7 +25,7 @@ import 
org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.plan._
 import org.apache.gearpump.streaming.dsl.plan.functions._
 import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowAndGroup}
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -210,8 +210,8 @@ class WindowStream[T](graph: Graph[Op, OpEdge], edge: 
Option[OpEdge], thisNode:
 
   def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
       description: String = "groupBy"): Stream[T] = {
-    val groupBy: GroupByFn[T, List[WindowAndGroup[GROUP]]] = 
GroupAlsoByWindow(fn, window)
-    val groupOp = GroupByOp[T, List[WindowAndGroup[GROUP]]](groupBy, 
parallelism,
+    val groupBy = GroupAlsoByWindow(fn, window)
+    val groupOp = GroupByOp[T, GROUP](groupBy, parallelism,
       s"$winDesc.$description")
     graph.addVertex(groupOp)
     graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
deleted file mode 100644
index 30e68ba..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.window.api
-
-import akka.actor.ActorSystem
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * Divides messages into groups according its payload and timestamp.
- * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]]
- * for default implementation.
- */
-trait GroupByFn[T, GROUP] {
-
-  /**
-   * Used by
-   *   1. GroupByPartitioner to shuffle messages
-   *   2. WindowRunner to group messages for time-based aggregation
-   */
-  def groupBy(message: Message): GROUP
-
-  /**
-   * Returns a Processor according to window trigger during planning
-   */
-  def getProcessor(parallelism: Int, description: String,
-      userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: 
Task]
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index eb5d551..fe644af 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -64,10 +64,9 @@ case class WindowAndGroup[GROUP](window: Window, group: 
GROUP)
   }
 }
 
-case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: 
Windows[T])
-  extends GroupByFn[T, List[WindowAndGroup[GROUP]]] {
+case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: 
Windows[T]) {
 
-  override def groupBy(message: Message): List[WindowAndGroup[GROUP]] = {
+  def groupBy(message: Message): List[WindowAndGroup[GROUP]] = {
     val ele = message.msg.asInstanceOf[T]
     val group = groupByFn(ele)
     val windows = window.windowFn(new WindowFunction.Context[T] {
@@ -77,7 +76,7 @@ case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, 
window: Windows[T]
     windows.map(WindowAndGroup(_, group)).toList
   }
 
-  override def getProcessor(parallelism: Int, description: String,
+  def getProcessor(parallelism: Int, description: String,
       userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: 
Task] = {
     val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
this)
     window.trigger match {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
index fb45e35..1934d14 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
@@ -18,13 +18,9 @@
 
 package org.apache.gearpump.streaming.dsl.partitioner
 
-import java.time.Duration
-
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.apache.gearpump.Message
 import 
org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
-import org.apache.gearpump.streaming.dsl.window.api.{FixedWindows, GroupByFn}
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowAndGroup}
 
 class GroupByPartitionerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
 
@@ -34,16 +30,11 @@ class GroupByPartitionerSpec extends FlatSpec with Matchers 
with BeforeAndAfterA
     val michelle = People("Michelle", "female")
 
     val partitionNum = 10
-    val groupByFn: GroupByFn[People, List[WindowAndGroup[String]]] =
-      GroupAlsoByWindow[People, String](_.gender,
-        FixedWindows.apply[People](Duration.ofMillis(5)))
-    val groupBy = new GroupByPartitioner[People, 
List[WindowAndGroup[String]]](groupByFn)
+
+    val groupBy = new GroupByPartitioner[People, String](_.gender)
     groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
       groupBy.getPartition(Message(tom, 2L), partitionNum)
 
-    groupBy.getPartition(Message(mark, 1L), partitionNum) should not be
-      groupBy.getPartition(Message(tom, 6L), partitionNum)
-
     groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
       groupBy.getPartition(Message(michelle, 3L), partitionNum)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index 461d3da..d007e09 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -27,7 +27,7 @@ import 
org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, 
AnyTask}
 import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, 
FunctionRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
@@ -169,8 +169,8 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
   "GroupByOp" should {
 
     "chain ChainableOp" in {
-      val groupByFn = mock[GroupByFn[Any, Any]]
-      val groupByOp = GroupByOp[Any, Any](groupByFn)
+      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+      val groupByOp = GroupByOp[Any, Any](groupBy)
       val fn = mock[FunctionRunner[Any, Any]]
       val chainableOp = mock[ChainableOp[Any, Any]]
       when(chainableOp.fn).thenReturn(fn)
@@ -186,11 +186,11 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
     }
 
     "delegate to groupByFn on getProcessor" in {
-      val groupByFn = mock[GroupByFn[Any, Any]]
-      val groupByOp = GroupByOp[Any, Any](groupByFn)
+      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+      val groupByOp = GroupByOp[Any, Any](groupBy)
 
       groupByOp.getProcessor
-      verify(groupByFn).getProcessor(anyInt, anyString, 
any[UserConfig])(any[ActorSystem])
+      verify(groupBy).getProcessor(anyInt, anyString, 
any[UserConfig])(any[ActorSystem])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cabd8ca/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 3f23fa9..2e4bbb3 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -29,7 +29,8 @@ import 
org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
 import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.dsl.window.api.CountWindows
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.{MockUtil, Processor}
@@ -57,7 +58,8 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
   "Planner" should "chain operations" in {
     val graph = Graph.empty[Op, OpEdge]
     val sourceOp = DataSourceOp(new AnySource)
-    val groupByOp = GroupByOp(new AnyGroupByFn)
+    val groupBy = GroupAlsoByWindow((any: Any) => any, 
CountWindows.apply[Any](1))
+    val groupByOp = GroupByOp(groupBy)
     val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
     val reduceOp = ChainableOp[Any, Any](anyReducer)
     val processorOp = new ProcessorOp[AnyTask]
@@ -93,7 +95,6 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
 
 object PlannerSpec {
 
-  private val anyParallelism = 1
   private val anyFlatMapper = new FlatMapper[Any, Any](
     FlatMapFunction(Option(_)), "flatMap")
   private val anyReducer = new Reducer[Any](
@@ -120,16 +121,4 @@ object PlannerSpec {
 
     override def close(): Unit = {}
   }
-
-  class AnyGroupByFn extends GroupByFn[Any, Any] {
-
-    override def groupBy(message: Message): Any = message.msg
-
-    override def getProcessor(
-        parallelism: Int,
-        description: String,
-        userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: 
Task] = {
-      Processor[AnyTask](anyParallelism, description)
-    }
-  }
 }

Reply via email to