Repository: incubator-gearpump
Updated Branches:
  refs/heads/master c3d5eb63f -> fb37ce809


[GEARPUMP-225] move partitioner to module gearpump-streaming

Author: huafengw <[email protected]>

Closes #99 from huafengw/partitioner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fb37ce80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fb37ce80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fb37ce80

Branch: refs/heads/master
Commit: fb37ce809b2917a12cc64d4337418cbb72265fda
Parents: c3d5eb6
Author: huafengw <[email protected]>
Authored: Mon Oct 24 11:31:41 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Mon Oct 24 11:31:41 2016 +0800

----------------------------------------------------------------------
 .../partitioner/BroadcastPartitioner.scala      |  36 ------
 .../partitioner/CoLocationPartitioner.scala     |  31 ------
 .../gearpump/partitioner/HashPartitioner.scala  |  32 ------
 .../gearpump/partitioner/Partitioner.scala      | 109 -------------------
 .../ShuffleGroupingPartitioner.scala            |  45 --------
 .../partitioner/ShufflePartitioner.scala        |  44 --------
 .../org/apache/gearpump/util/Constants.scala    |  10 --
 .../gearpump/partitioner/PartitionerSpec.scala  |  81 --------------
 .../pagerank/PageRankApplication.scala          |   2 +-
 .../streaming/examples/complexdag/Dag.scala     |   2 +-
 .../examples/fsio/SequenceFileIO.scala          |   2 +-
 .../examples/kafka/KafkaReadWrite.scala         |   2 +-
 .../kafka/wordcount/KafkaWordCount.scala        |   2 +-
 .../gearpump/streaming/examples/sol/SOL.scala   |   2 +-
 .../examples/state/MessageCountApp.scala        |   2 +-
 .../examples/state/WindowAverageApp.scala       |   2 +-
 .../examples/wordcountjava/WordCount.java       |   5 +-
 .../examples/wordcount/WordCount.scala          |   2 +-
 .../storm/partitioner/StormPartitioner.scala    |   2 +-
 .../experiments/storm/util/GraphBuilder.scala   |   2 +-
 .../partitioner/StormPartitionerSpec.scala      |   2 +-
 .../checklist/DynamicDagSpec.scala              |   2 +-
 .../gearpump/services/MasterService.scala       |   5 +-
 .../gearpump/streaming/javaapi/Graph.java       |   2 +-
 .../apache/gearpump/streaming/Constants.scala   |  10 ++
 .../org/apache/gearpump/streaming/DAG.scala     |   2 +-
 .../gearpump/streaming/StreamApplication.scala  |   2 +-
 .../streaming/appmaster/AppMaster.scala         |   2 +-
 .../streaming/appmaster/DagManager.scala        |   2 +-
 .../streaming/appmaster/JarScheduler.scala      |   2 +-
 .../dsl/partitioner/GroupByPartitioner.scala    |   2 +-
 .../gearpump/streaming/dsl/plan/Planner.scala   |   2 +-
 .../partitioner/BroadcastPartitioner.scala      |  36 ++++++
 .../partitioner/CoLocationPartitioner.scala     |  31 ++++++
 .../streaming/partitioner/HashPartitioner.scala |  32 ++++++
 .../streaming/partitioner/Partitioner.scala     | 108 ++++++++++++++++++
 .../ShuffleGroupingPartitioner.scala            |  45 ++++++++
 .../partitioner/ShufflePartitioner.scala        |  44 ++++++++
 .../gearpump/streaming/task/Subscriber.scala    |   2 +-
 .../gearpump/streaming/task/Subscription.scala  |   2 +-
 .../org/apache/gearpump/streaming/DAGSpec.scala |   2 +-
 .../streaming/appmaster/AppMasterSpec.scala     |   2 +-
 .../streaming/appmaster/ClockServiceSpec.scala  |   2 +-
 .../streaming/appmaster/DagManagerSpec.scala    |   2 +-
 .../streaming/appmaster/JarSchedulerSpec.scala  |   2 +-
 .../streaming/appmaster/TaskManagerSpec.scala   |   2 +-
 .../streaming/appmaster/TaskSchedulerSpec.scala |   2 +-
 .../gearpump/streaming/dsl/StreamAppSpec.scala  |   2 +-
 .../gearpump/streaming/dsl/StreamSpec.scala     |   2 +-
 .../streaming/dsl/plan/PlannerSpec.scala        |   2 +-
 .../streaming/partitioner/PartitionerSpec.scala |  80 ++++++++++++++
 .../streaming/task/SubscriberSpec.scala         |   2 +-
 .../streaming/task/SubscriptionSpec.scala       |   2 +-
 .../gearpump/streaming/task/TaskActorSpec.scala |   2 +-
 54 files changed, 427 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
 
b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
deleted file mode 100644
index 99cbcb6..0000000
--- 
a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/** Used by storm module to broadcast message to all downstream tasks  */
-class BroadcastPartitioner extends MulticastPartitioner {
-  private var lastPartitionNum = -1
-  private var partitions = Array.empty[Int]
-
-  override def getPartitions(
-      msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
-    if (partitionNum != lastPartitionNum) {
-      partitions = (0 until partitionNum).toArray
-      lastPartitionNum = partitionNum
-    }
-    partitions
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
 
b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
deleted file mode 100644
index 5a3eec4..0000000
--- 
a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/**
- * Will have the same parallelism with last processor
- * And each task in current processor will co-locate with task of last 
processor
- */
-class CoLocationPartitioner extends UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
-    currentPartitionId
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala 
b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
deleted file mode 100644
index ee684a9..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/**
- * Only make sense when the message has implemented the hashCode()
- * Otherwise, it will use Object.hashCode(), which will not return
- * same hash code after serialization and deserialization.
- */
-class HashPartitioner extends UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
-    (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala 
b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
deleted file mode 100644
index d68fa65..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import scala.reflect.ClassTag
-
-import org.apache.commons.lang.SerializationUtils
-
-import org.apache.gearpump.Message
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner 
decide how ONE task
- * of upstream processor A send to several tasks of downstream processor B.
- */
-sealed trait Partitioner extends Serializable
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), 
UnicastPartitioner does
- * ONE-task {@literal ->} ONE-task mapping.
- */
-trait UnicastPartitioner extends Partitioner {
-
-  /**
-   * Gets the SINGLE downstream processor task index to send message to.
-   *
-   * @param msg Message you want to send
-   * @param partitionNum How many tasks does the downstream processor have.
-   * @param upstreamTaskIndex Upstream task's task index who trigger the 
getPartition() call.
-   *
-   * @return ONE task index of downstream processor.
-   */
-  def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): 
Int
-
-  def getPartition(msg: Message, partitionNum: Int): Int = {
-    getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
-  }
-}
-
-trait MulticastPartitioner extends Partitioner {
-
-  /**
-   * Gets a list of downstream processor task indexes to send message to.
-   *
-   * @param upstreamTaskIndex Current sender task's task index.
-   *
-   */
-  def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): 
Array[Int]
-
-  def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
-    getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
-  }
-}
-
-sealed trait PartitionerFactory {
-
-  def name: String
-
-  def partitioner: Partitioner
-}
-
-/** Stores the Partitioner in an object. To use it, user need to deserialize 
the object */
-class PartitionerObject(private[this] val _partitioner: Partitioner)
-  extends PartitionerFactory with Serializable {
-
-  override def name: String = partitioner.getClass.getName
-
-  override def partitioner: Partitioner = {
-    SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
-  }
-}
-
-/** Store the partitioner in class Name, the user need to instantiate a new 
class */
-class PartitionerByClassName(partitionerClass: String)
-  extends PartitionerFactory with Serializable {
-
-  override def name: String = partitionerClass
-  override def partitioner: Partitioner = {
-    Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
-  }
-}
-
-/**
- * @param partitionerFactory How we construct a Partitioner.
- */
-case class PartitionerDescription(partitionerFactory: PartitionerFactory)
-
-object Partitioner {
-  val UNKNOWN_PARTITION_ID = -1
-
-  def apply[T <: Partitioner](implicit clazz: ClassTag[T]): 
PartitionerDescription = {
-    PartitionerDescription(new 
PartitionerByClassName(clazz.runtimeClass.getName))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
 
b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
deleted file mode 100644
index 55ef614..0000000
--- 
a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import scala.util.Random
-
-import org.apache.gearpump.Message
-
-/**
- * The idea of ShuffleGroupingPartitioner is derived from Storm.
- * Messages are randomly distributed across the downstream's tasks in a way 
such that
- * each task is guaranteed to get an equal number of messages.
- */
-class ShuffleGroupingPartitioner extends UnicastPartitioner {
-  private val random = new Random
-  private var index = -1
-  private var partitions = List.empty[Int]
-  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
-    index += 1
-    if (partitions.isEmpty) {
-      partitions = 0.until(partitionNum).toList
-      partitions = random.shuffle(partitions)
-    } else if (index >= partitionNum) {
-      index = 0
-      partitions = random.shuffle(partitions)
-    }
-    partitions(index)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala 
b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
deleted file mode 100644
index 5c66d66..0000000
--- 
a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import java.util.Random
-
-import org.apache.gearpump.Message
-
-/**
- * Round Robin partition the data to downstream processor tasks.
- */
-class ShufflePartitioner extends UnicastPartitioner {
-  private var seed = 0
-  private var count = 0
-
-  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
-
-    if (seed == 0) {
-      seed = newSeed()
-    }
-
-    val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
-    count = count + 1
-    result
-  }
-
-  private def newSeed(): Int = new Random().nextInt()
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala 
b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
index dba5a1f..c98726e 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Constants.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
@@ -20,8 +20,6 @@ package org.apache.gearpump.util
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.gearpump.partitioner._
-
 object Constants {
   val MASTER_WATCHER = "masterwatcher"
   val SINGLETON_MANAGER = "singleton"
@@ -140,14 +138,6 @@ object Constants {
   val GEARPUMP_SERVICE_SUPERVISOR_PATH = 
"gearpump.services.supervisor-actor-path"
   val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = 
"gearpump.services.config-render-option-concise"
 
-  // The partitioners provided by Gearpump
-  val BUILTIN_PARTITIONERS = Array(
-    classOf[BroadcastPartitioner],
-    classOf[CoLocationPartitioner],
-    classOf[HashPartitioner],
-    classOf[ShuffleGroupingPartitioner],
-    classOf[ShufflePartitioner])
-
   // Security related
   val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file"
   val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala 
b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
deleted file mode 100644
index fcf819b..0000000
--- a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.Message
-
-class PartitionerSpec extends FlatSpec with Matchers {
-  val NUM = 10
-
-  "HashPartitioner" should "hash same key to same slots" in {
-    val partitioner = new HashPartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-
-    val partition = partitioner.getPartition(msg, NUM)
-    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
-    assert(partition == partitioner.getPartition(msg, NUM), "multiple run 
should return" +
-      "consistent result")
-  }
-
-  "ShufflePartitioner" should "hash same key randomly" in {
-    val partitioner = new ShufflePartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-
-    val partition = partitioner.getPartition(msg, NUM)
-    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
-    assert(partition != partitioner.getPartition(msg, NUM), "multiple run 
should return" +
-      "consistent result")
-  }
-
-  "BroadcastPartitioner" should "return all partitions" in {
-    val partitioner = new BroadcastPartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-    val partitions = partitioner.getPartitions(msg, NUM)
-
-    partitions should contain theSameElementsAs 0.until(NUM)
-  }
-
-
-  "ShuffleGroupingPartitioner" should "hash same key randomly" in {
-    val partitioner = new ShuffleGroupingPartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-
-    val partition = partitioner.getPartition(msg, NUM)
-    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
-    assert(partition != partitioner.getPartition(msg, NUM), "multiple run 
should return" +
-      "consistent result")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
----------------------------------------------------------------------
diff --git 
a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
 
b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
index 023ee35..c7bfb43 100644
--- 
a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
+++ 
b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
@@ -21,7 +21,7 @@ import akka.actor.ActorSystem
 
 import org.apache.gearpump.cluster.{Application, ApplicationMaster, UserConfig}
 import 
org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.appmaster.AppMaster
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
 
b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
index 3b6ceb8..165df62 100644
--- 
a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
+++ 
b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
@@ -23,7 +23,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph.{Node => GraphNode}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
 
b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
index 5c75904..3a80549 100644
--- 
a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
+++ 
b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
@@ -23,7 +23,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
-import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.partitioner.ShufflePartitioner
 import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
index cfeef5b..4b48e7d 100644
--- 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
@@ -27,7 +27,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
-import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.partitioner.ShufflePartitioner
 import org.apache.gearpump.streaming.StreamApplication
 import org.apache.gearpump.streaming.kafka._
 import org.apache.gearpump.streaming.sink.DataSinkProcessor

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
index aa9842f..80f0ff7 100644
--- 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
@@ -28,7 +28,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.kafka._
 import org.apache.gearpump.streaming.sink.DataSinkProcessor
 import org.apache.gearpump.streaming.source.DataSourceProcessor

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
 
b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
index fb80ad3..01aa95e 100644
--- 
a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
+++ 
b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
@@ -23,7 +23,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
-import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.partitioner.ShufflePartitioner
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph._
 import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
index 9bd2bc5..59289a5 100644
--- 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
+++ 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.examples.state.processor.CountProcessor
 import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
 import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
index 629deb7..50235bc 100644
--- 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
+++ 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import 
org.apache.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor,
 WindowAverageProcessor}
 import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
 import org.apache.gearpump.streaming.state.impl.{PersistentStateConfig, 
WindowConfig}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
index 6b5bba0..5e3d472 100644
--- 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -23,12 +23,11 @@ import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.partitioner.HashPartitioner;
-import org.apache.gearpump.partitioner.Partitioner;
+import org.apache.gearpump.streaming.partitioner.HashPartitioner;
+import org.apache.gearpump.streaming.partitioner.Partitioner;
 import org.apache.gearpump.streaming.javaapi.Graph;
 import org.apache.gearpump.streaming.javaapi.Processor;
 import org.apache.gearpump.streaming.javaapi.StreamApplication;
-import org.apache.gearpump.util.Constants;
 
 /** Java version of WordCount with Processor Graph API */
 public class WordCount {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
index 9580e63..0e3d840 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -23,7 +23,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.source.DataSourceProcessor
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph.Node

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
index aaa0a99..4969314 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.experiments.storm.partitioner
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
-import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, 
Partitioner}
 
 /**
  * Partitioner bound to a target Storm component

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
index 777acab..e3f1339 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.experiments.storm.util
 
 import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
-import org.apache.gearpump.partitioner.Partitioner
+import org.apache.gearpump.streaming.partitioner.Partitioner
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.Graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
index 5513423..5fc631b 100644
--- 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
@@ -27,7 +27,7 @@ import org.scalatest.{Matchers, PropSpec}
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
-import org.apache.gearpump.partitioner.Partitioner
+import org.apache.gearpump.streaming.partitioner.Partitioner
 
 class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
index 89b8ef7..a1d1162 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -33,7 +33,7 @@ class DynamicDagSpec extends TestSpecBase {
       val partitioners = restClient.queryBuiltInPartitioners()
       partitioners.length should be > 0
       partitioners.foreach(clazz =>
-        clazz should startWith("org.apache.gearpump.partitioner.")
+        clazz should startWith("org.apache.gearpump.streaming.partitioner.")
       )
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala 
b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index bf7092e..b08e0fd 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.worker.WorkerSummary
 import org.apache.gearpump.cluster.{ClusterConfig, UserConfig}
 import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, 
JarStoreServer}
-import org.apache.gearpump.partitioner.{PartitionerByClassName, 
PartitionerDescription}
+import org.apache.gearpump.streaming.partitioner.{PartitionerByClassName, 
PartitionerDescription}
 import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, 
SubmitApplicationRequest}
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
@@ -192,7 +192,8 @@ class MasterService(val master: ActorRef,
     } ~
     path("partitioners") {
       get {
-        
complete(write(BuiltinPartitioners(Constants.BUILTIN_PARTITIONERS.map(_.getName))))
+        
complete(write(BuiltinPartitioners(org.apache.gearpump.streaming.Constants
+          .BUILTIN_PARTITIONERS.map(_.getName))))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java
index 8f85aa3..aaf6db8 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.javaapi;
 
-import org.apache.gearpump.partitioner.Partitioner;
+import org.apache.gearpump.streaming.partitioner.Partitioner;
 import org.apache.gearpump.streaming.Processor;
 import org.apache.gearpump.streaming.task.Task;
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index f99a436..d7582b0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.gearpump.streaming
 
+import org.apache.gearpump.streaming.partitioner._
+
 object Constants {
 
   val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
@@ -36,4 +38,12 @@ object Constants {
 
   val GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW =
     "gearpump.streaming.executor-restart-time-window"
+
+  // The partitioners provided by Gearpump
+  val BUILTIN_PARTITIONERS = Array(
+    classOf[BroadcastPartitioner],
+    classOf[CoLocationPartitioner],
+    classOf[HashPartitioner],
+    classOf[ShuffleGroupingPartitioner],
+    classOf[ShufflePartitioner])
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
index 4a94ad3..8ad74f8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming
 
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming.task.TaskId
 import org.apache.gearpump.util.Graph
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index a6588a1..ca8d89e 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -25,7 +25,7 @@ import akka.actor.ActorSystem
 
 import org.apache.gearpump.TimeStamp
 import org.apache.gearpump.cluster._
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, 
PartitionerDescription, PartitionerObject}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, 
Partitioner, PartitionerDescription, PartitionerObject}
 import org.apache.gearpump.streaming.appmaster.AppMaster
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.{Graph, LogUtil, ReferenceEqual}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index 31e1151..1341464 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -30,7 +30,7 @@ import org.apache.gearpump.cluster._
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.metrics.Metrics.ReportMetrics
 import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, 
MetricsReporterService}
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
RegisterExecutor, RegisterTask, UnRegisterTask}
 import org.apache.gearpump.streaming._
 import org.apache.gearpump.streaming.appmaster.AppMaster._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
index 2736f5e..6154946 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster
 import akka.actor.{ExtendedActorSystem, Actor, ActorRef, Stash}
 import akka.serialization.JavaSerializer
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming._
 import org.apache.gearpump.streaming.appmaster.DagManager._
 import org.apache.gearpump.streaming.storage.AppDataStore

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
index 6de5306..e023cdf 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
@@ -24,7 +24,7 @@ import org.apache.gearpump.TimeStamp
 import org.apache.gearpump.cluster.AppJar
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming.appmaster.JarScheduler._
 import org.apache.gearpump.streaming.task.TaskId
 import org.apache.gearpump.streaming.{DAG, ProcessorDescription}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
index 2ec881b..efa7409 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -19,7 +19,7 @@
 package org.apache.gearpump.streaming.dsl.partitioner
 
 import org.apache.gearpump.Message
-import org.apache.gearpump.partitioner.UnicastPartitioner
+import org.apache.gearpump.streaming.partitioner.UnicastPartitioner
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 16d5c06..65f9cd2 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.dsl.plan
 
 import akka.actor.ActorSystem
 
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, 
HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.task.Task

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
new file mode 100644
index 0000000..9b63e04
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/** Used by storm module to broadcast message to all downstream tasks  */
+class BroadcastPartitioner extends MulticastPartitioner {
+  private var lastPartitionNum = -1
+  private var partitions = Array.empty[Int]
+
+  override def getPartitions(
+      msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+    if (partitionNum != lastPartitionNum) {
+      partitions = (0 until partitionNum).toArray
+      lastPartitionNum = partitionNum
+    }
+    partitions
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
new file mode 100644
index 0000000..4cb1bad
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Will have the same parallelism with last processor
+ * And each task in current processor will co-locate with task of last 
processor
+ */
+class CoLocationPartitioner extends UnicastPartitioner {
+  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
+    currentPartitionId
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
new file mode 100644
index 0000000..6137705
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Only make sense when the message has implemented the hashCode()
+ * Otherwise, it will use Object.hashCode(), which will not return
+ * same hash code after serialization and deserialization.
+ */
+class HashPartitioner extends UnicastPartitioner {
+  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
+    (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
new file mode 100644
index 0000000..f685cc9
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.commons.lang.SerializationUtils
+import org.apache.gearpump.Message
+
+import scala.reflect.ClassTag
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner 
decide how ONE task
+ * of upstream processor A send to several tasks of downstream processor B.
+ */
+sealed trait Partitioner extends Serializable
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), 
UnicastPartitioner does
+ * ONE-task {@literal ->} ONE-task mapping.
+ */
+trait UnicastPartitioner extends Partitioner {
+
+  /**
+   * Gets the SINGLE downstream processor task index to send message to.
+   *
+   * @param msg Message you want to send
+   * @param partitionNum How many tasks does the downstream processor have.
+   * @param upstreamTaskIndex Upstream task's task index who trigger the 
getPartition() call.
+   *
+   * @return ONE task index of downstream processor.
+   */
+  def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): 
Int
+
+  def getPartition(msg: Message, partitionNum: Int): Int = {
+    getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+  }
+}
+
+trait MulticastPartitioner extends Partitioner {
+
+  /**
+   * Gets a list of downstream processor task indexes to send message to.
+   *
+   * @param upstreamTaskIndex Current sender task's task index.
+   *
+   */
+  def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): 
Array[Int]
+
+  def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
+    getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+  }
+}
+
+sealed trait PartitionerFactory {
+
+  def name: String
+
+  def partitioner: Partitioner
+}
+
+/** Stores the Partitioner in an object. To use it, user need to deserialize 
the object */
+class PartitionerObject(private[this] val _partitioner: Partitioner)
+  extends PartitionerFactory with Serializable {
+
+  override def name: String = partitioner.getClass.getName
+
+  override def partitioner: Partitioner = {
+    SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+  }
+}
+
+/** Store the partitioner in class Name, the user need to instantiate a new 
class */
+class PartitionerByClassName(partitionerClass: String)
+  extends PartitionerFactory with Serializable {
+
+  override def name: String = partitionerClass
+  override def partitioner: Partitioner = {
+    Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+  }
+}
+
+/**
+ * @param partitionerFactory How we construct a Partitioner.
+ */
+case class PartitionerDescription(partitionerFactory: PartitionerFactory)
+
+object Partitioner {
+  val UNKNOWN_PARTITION_ID = -1
+
+  def apply[T <: Partitioner](implicit clazz: ClassTag[T]): 
PartitionerDescription = {
+    PartitionerDescription(new 
PartitionerByClassName(clazz.runtimeClass.getName))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
new file mode 100644
index 0000000..1b223e0
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+import scala.util.Random
+
+/**
+ * The idea of ShuffleGroupingPartitioner is derived from Storm.
+ * Messages are randomly distributed across the downstream's tasks in a way 
such that
+ * each task is guaranteed to get an equal number of messages.
+ */
+class ShuffleGroupingPartitioner extends UnicastPartitioner {
+  private val random = new Random
+  private var index = -1
+  private var partitions = List.empty[Int]
+  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
+    index += 1
+    if (partitions.isEmpty) {
+      partitions = 0.until(partitionNum).toList
+      partitions = random.shuffle(partitions)
+    } else if (index >= partitionNum) {
+      index = 0
+      partitions = random.shuffle(partitions)
+    }
+    partitions(index)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
new file mode 100644
index 0000000..39d5e3b
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import java.util.Random
+
+import org.apache.gearpump.Message
+
+/**
+ * Round Robin partition the data to downstream processor tasks.
+ */
+class ShufflePartitioner extends UnicastPartitioner {
+  private var seed = 0
+  private var count = 0
+
+  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
+
+    if (seed == 0) {
+      seed = newSeed()
+    }
+
+    val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
+    count = count + 1
+    result
+  }
+
+  private def newSeed(): Int = new Random().nextInt()
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
index 692d7f9..5c99980 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.task
 
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming.{DAG, LifeTime}
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index d9fbc82..16f9e93 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.task
 import org.slf4j.Logger
 
 import org.apache.gearpump.google.common.primitives.Shorts
-import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner, 
UnicastPartitioner}
+import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, 
Partitioner, UnicastPartitioner}
 import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
 import org.apache.gearpump.streaming.LifeTime
 import org.apache.gearpump.streaming.task.Subscription._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
index 5f4faee..ccda8f0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming
 
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming.task.TaskId
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph.Node

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index e461ae8..29dfc57 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.master.MasterProxy
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask
 import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
UnRegisterTask}
 import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, 
LookupTaskActorRef}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index d42fe6f..46175a4 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, 
PartitionerDescription}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, 
Partitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, 
ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
 import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store
 import org.apache.gearpump.streaming.storage.AppDataStore

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
index be3b3b7..adde927 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -22,7 +22,7 @@ package org.apache.gearpump.streaming.appmaster
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, 
DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, 
NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
 import org.apache.gearpump.streaming.task.{Subscriber, TaskActor}
 import org.apache.gearpump.streaming._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
index 5f6dd04..def9d44 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -22,7 +22,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, 
ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{AppJar, TestUtil}
 import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, 
TestTask2}
 import org.apache.gearpump.streaming.task.TaskId
 import org.apache.gearpump.streaming.{DAG, ProcessorDescription, _}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 54ecde1..bcf96e4 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, 
ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{AppJar, TestUtil, UserConfig}
 import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, 
PartitionerDescription}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, 
Partitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, 
StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, 
TaskRegistered}
 import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask
 import 
org.apache.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index 864aa93..1bfde94 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, 
ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities
 import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, 
TestTask2}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
index e0407ec..db4db93 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.dsl
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.TestUtil
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.util.Graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index fdc721b..8def61e 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -22,7 +22,7 @@ import akka.actor._
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, 
HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
HashPartitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.streaming.dsl.StreamSpec.Join
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index f8666ba..1610f0e 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.CoLocationPartitioner
+import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
 import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, 
ReduceFunction}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala
new file mode 100644
index 0000000..277a31c
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+import org.scalatest.{FlatSpec, Matchers}
+
+class PartitionerSpec extends FlatSpec with Matchers {
+  val NUM = 10
+
+  "HashPartitioner" should "hash same key to same slots" in {
+    val partitioner = new HashPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition == partitioner.getPartition(msg, NUM), "multiple run 
should return" +
+      "consistent result")
+  }
+
+  "ShufflePartitioner" should "hash same key randomly" in {
+    val partitioner = new ShufflePartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition != partitioner.getPartition(msg, NUM), "multiple run 
should return" +
+      "consistent result")
+  }
+
+  "BroadcastPartitioner" should "return all partitions" in {
+    val partitioner = new BroadcastPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+    val partitions = partitioner.getPartitions(msg, NUM)
+
+    partitions should contain theSameElementsAs 0.until(NUM)
+  }
+
+
+  "ShuffleGroupingPartitioner" should "hash same key randomly" in {
+    val partitioner = new ShuffleGroupingPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition != partitioner.getPartition(msg, NUM), "multiple run 
should return" +
+      "consistent result")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
index cfe47eb..bd3ddb8 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.task
 
 import org.scalatest.{FlatSpec, Matchers}
 
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.task.SubscriberSpec.TestTask
 import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
 import org.apache.gearpump.util.Graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 258a5ff..d128ace 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -26,7 +26,7 @@ import org.scalatest.{FlatSpec, Matchers}
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
 import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
index 48901d2..8deee78 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{MasterHarness, TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.serializer.{FastKryoSerializer, 
SerializationFramework}
 import org.apache.gearpump.streaming.AppMasterToExecutor.{ChangeTask, 
MsgLostException, StartTask, TaskChanged, TaskRegistered}
 import org.apache.gearpump.streaming.task.TaskActorSpec.TestTask

Reply via email to