Repository: incubator-gearpump Updated Branches: refs/heads/master c3d5eb63f -> fb37ce809
[GEARPUMP-225] move partitioner to module gearpump-streaming Author: huafengw <[email protected]> Closes #99 from huafengw/partitioner. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fb37ce80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fb37ce80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fb37ce80 Branch: refs/heads/master Commit: fb37ce809b2917a12cc64d4337418cbb72265fda Parents: c3d5eb6 Author: huafengw <[email protected]> Authored: Mon Oct 24 11:31:41 2016 +0800 Committer: manuzhang <[email protected]> Committed: Mon Oct 24 11:31:41 2016 +0800 ---------------------------------------------------------------------- .../partitioner/BroadcastPartitioner.scala | 36 ------ .../partitioner/CoLocationPartitioner.scala | 31 ------ .../gearpump/partitioner/HashPartitioner.scala | 32 ------ .../gearpump/partitioner/Partitioner.scala | 109 ------------------- .../ShuffleGroupingPartitioner.scala | 45 -------- .../partitioner/ShufflePartitioner.scala | 44 -------- .../org/apache/gearpump/util/Constants.scala | 10 -- .../gearpump/partitioner/PartitionerSpec.scala | 81 -------------- .../pagerank/PageRankApplication.scala | 2 +- .../streaming/examples/complexdag/Dag.scala | 2 +- .../examples/fsio/SequenceFileIO.scala | 2 +- .../examples/kafka/KafkaReadWrite.scala | 2 +- .../kafka/wordcount/KafkaWordCount.scala | 2 +- .../gearpump/streaming/examples/sol/SOL.scala | 2 +- .../examples/state/MessageCountApp.scala | 2 +- .../examples/state/WindowAverageApp.scala | 2 +- .../examples/wordcountjava/WordCount.java | 5 +- .../examples/wordcount/WordCount.scala | 2 +- .../storm/partitioner/StormPartitioner.scala | 2 +- .../experiments/storm/util/GraphBuilder.scala | 2 +- .../partitioner/StormPartitionerSpec.scala | 2 +- .../checklist/DynamicDagSpec.scala | 2 +- .../gearpump/services/MasterService.scala | 5 +- .../gearpump/streaming/javaapi/Graph.java | 2 +- .../apache/gearpump/streaming/Constants.scala | 10 ++ .../org/apache/gearpump/streaming/DAG.scala | 2 +- .../gearpump/streaming/StreamApplication.scala | 2 +- .../streaming/appmaster/AppMaster.scala | 2 +- .../streaming/appmaster/DagManager.scala | 2 +- .../streaming/appmaster/JarScheduler.scala | 2 +- .../dsl/partitioner/GroupByPartitioner.scala | 2 +- .../gearpump/streaming/dsl/plan/Planner.scala | 2 +- .../partitioner/BroadcastPartitioner.scala | 36 ++++++ .../partitioner/CoLocationPartitioner.scala | 31 ++++++ .../streaming/partitioner/HashPartitioner.scala | 32 ++++++ .../streaming/partitioner/Partitioner.scala | 108 ++++++++++++++++++ .../ShuffleGroupingPartitioner.scala | 45 ++++++++ .../partitioner/ShufflePartitioner.scala | 44 ++++++++ .../gearpump/streaming/task/Subscriber.scala | 2 +- .../gearpump/streaming/task/Subscription.scala | 2 +- .../org/apache/gearpump/streaming/DAGSpec.scala | 2 +- .../streaming/appmaster/AppMasterSpec.scala | 2 +- .../streaming/appmaster/ClockServiceSpec.scala | 2 +- .../streaming/appmaster/DagManagerSpec.scala | 2 +- .../streaming/appmaster/JarSchedulerSpec.scala | 2 +- .../streaming/appmaster/TaskManagerSpec.scala | 2 +- .../streaming/appmaster/TaskSchedulerSpec.scala | 2 +- .../gearpump/streaming/dsl/StreamAppSpec.scala | 2 +- .../gearpump/streaming/dsl/StreamSpec.scala | 2 +- .../streaming/dsl/plan/PlannerSpec.scala | 2 +- .../streaming/partitioner/PartitionerSpec.scala | 80 ++++++++++++++ .../streaming/task/SubscriberSpec.scala | 2 +- .../streaming/task/SubscriptionSpec.scala | 2 +- .../gearpump/streaming/task/TaskActorSpec.scala | 2 +- 54 files changed, 427 insertions(+), 429 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala deleted file mode 100644 index 99cbcb6..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import org.apache.gearpump.Message - -/** Used by storm module to broadcast message to all downstream tasks */ -class BroadcastPartitioner extends MulticastPartitioner { - private var lastPartitionNum = -1 - private var partitions = Array.empty[Int] - - override def getPartitions( - msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { - if (partitionNum != lastPartitionNum) { - partitions = (0 until partitionNum).toArray - lastPartitionNum = partitionNum - } - partitions - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala deleted file mode 100644 index 5a3eec4..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import org.apache.gearpump.Message - -/** - * Will have the same parallelism with last processor - * And each task in current processor will co-locate with task of last processor - */ -class CoLocationPartitioner extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - currentPartitionId - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala deleted file mode 100644 index ee684a9..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import org.apache.gearpump.Message - -/** - * Only make sense when the message has implemented the hashCode() - * Otherwise, it will use Object.hashCode(), which will not return - * same hash code after serialization and deserialization. - */ -class HashPartitioner extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala deleted file mode 100644 index d68fa65..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import scala.reflect.ClassTag - -import org.apache.commons.lang.SerializationUtils - -import org.apache.gearpump.Message - -/** - * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task - * of upstream processor A send to several tasks of downstream processor B. - */ -sealed trait Partitioner extends Serializable - -/** - * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does - * ONE-task {@literal ->} ONE-task mapping. - */ -trait UnicastPartitioner extends Partitioner { - - /** - * Gets the SINGLE downstream processor task index to send message to. - * - * @param msg Message you want to send - * @param partitionNum How many tasks does the downstream processor have. - * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call. - * - * @return ONE task index of downstream processor. - */ - def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int - - def getPartition(msg: Message, partitionNum: Int): Int = { - getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) - } -} - -trait MulticastPartitioner extends Partitioner { - - /** - * Gets a list of downstream processor task indexes to send message to. - * - * @param upstreamTaskIndex Current sender task's task index. - * - */ - def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int] - - def getPartitions(msg: Message, partitionNum: Int): Array[Int] = { - getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) - } -} - -sealed trait PartitionerFactory { - - def name: String - - def partitioner: Partitioner -} - -/** Stores the Partitioner in an object. To use it, user need to deserialize the object */ -class PartitionerObject(private[this] val _partitioner: Partitioner) - extends PartitionerFactory with Serializable { - - override def name: String = partitioner.getClass.getName - - override def partitioner: Partitioner = { - SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner] - } -} - -/** Store the partitioner in class Name, the user need to instantiate a new class */ -class PartitionerByClassName(partitionerClass: String) - extends PartitionerFactory with Serializable { - - override def name: String = partitionerClass - override def partitioner: Partitioner = { - Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner] - } -} - -/** - * @param partitionerFactory How we construct a Partitioner. - */ -case class PartitionerDescription(partitionerFactory: PartitionerFactory) - -object Partitioner { - val UNKNOWN_PARTITION_ID = -1 - - def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = { - PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala deleted file mode 100644 index 55ef614..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import scala.util.Random - -import org.apache.gearpump.Message - -/** - * The idea of ShuffleGroupingPartitioner is derived from Storm. - * Messages are randomly distributed across the downstream's tasks in a way such that - * each task is guaranteed to get an equal number of messages. - */ -class ShuffleGroupingPartitioner extends UnicastPartitioner { - private val random = new Random - private var index = -1 - private var partitions = List.empty[Int] - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - index += 1 - if (partitions.isEmpty) { - partitions = 0.until(partitionNum).toList - partitions = random.shuffle(partitions) - } else if (index >= partitionNum) { - index = 0 - partitions = random.shuffle(partitions) - } - partitions(index) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala deleted file mode 100644 index 5c66d66..0000000 --- a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import java.util.Random - -import org.apache.gearpump.Message - -/** - * Round Robin partition the data to downstream processor tasks. - */ -class ShufflePartitioner extends UnicastPartitioner { - private var seed = 0 - private var count = 0 - - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - - if (seed == 0) { - seed = newSeed() - } - - val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum - count = count + 1 - result - } - - private def newSeed(): Int = new Random().nextInt() -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/main/scala/org/apache/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala b/core/src/main/scala/org/apache/gearpump/util/Constants.scala index dba5a1f..c98726e 100644 --- a/core/src/main/scala/org/apache/gearpump/util/Constants.scala +++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala @@ -20,8 +20,6 @@ package org.apache.gearpump.util import java.util.concurrent.TimeUnit -import org.apache.gearpump.partitioner._ - object Constants { val MASTER_WATCHER = "masterwatcher" val SINGLETON_MANAGER = "singleton" @@ -140,14 +138,6 @@ object Constants { val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path" val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise" - // The partitioners provided by Gearpump - val BUILTIN_PARTITIONERS = Array( - classOf[BroadcastPartitioner], - classOf[CoLocationPartitioner], - classOf[HashPartitioner], - classOf[ShuffleGroupingPartitioner], - classOf[ShufflePartitioner]) - // Security related val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file" val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala deleted file mode 100644 index fcf819b..0000000 --- a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.partitioner - -import org.scalatest.{FlatSpec, Matchers} - -import org.apache.gearpump.Message - -class PartitionerSpec extends FlatSpec with Matchers { - val NUM = 10 - - "HashPartitioner" should "hash same key to same slots" in { - val partitioner = new HashPartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - - val partition = partitioner.getPartition(msg, NUM) - assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") - - assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" + - "consistent result") - } - - "ShufflePartitioner" should "hash same key randomly" in { - val partitioner = new ShufflePartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - - val partition = partitioner.getPartition(msg, NUM) - assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") - - assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + - "consistent result") - } - - "BroadcastPartitioner" should "return all partitions" in { - val partitioner = new BroadcastPartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - val partitions = partitioner.getPartitions(msg, NUM) - - partitions should contain theSameElementsAs 0.until(NUM) - } - - - "ShuffleGroupingPartitioner" should "hash same key randomly" in { - val partitioner = new ShuffleGroupingPartitioner - - val data = new Array[Byte](1000) - (new java.util.Random()).nextBytes(data) - val msg = Message(data) - - val partition = partitioner.getPartition(msg, NUM) - assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") - - assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + - "consistent result") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala index 023ee35..c7bfb43 100644 --- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala +++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala @@ -21,7 +21,7 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.{Application, ApplicationMaster, UserConfig} import org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId -import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.appmaster.AppMaster import org.apache.gearpump.streaming.{Processor, StreamApplication} import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala index 3b6ceb8..165df62 100644 --- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala @@ -23,7 +23,7 @@ import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.streaming.{Processor, StreamApplication} import org.apache.gearpump.util.Graph.{Node => GraphNode} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala index 5c75904..3a80549 100644 --- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala +++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala @@ -23,7 +23,7 @@ import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.ShufflePartitioner +import org.apache.gearpump.streaming.partitioner.ShufflePartitioner import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._ import org.apache.gearpump.streaming.{Processor, StreamApplication} import org.apache.gearpump.util.Graph._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala index cfeef5b..4b48e7d 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala @@ -27,7 +27,7 @@ import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.ShufflePartitioner +import org.apache.gearpump.streaming.partitioner.ShufflePartitioner import org.apache.gearpump.streaming.StreamApplication import org.apache.gearpump.streaming.kafka._ import org.apache.gearpump.streaming.sink.DataSinkProcessor http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala index aa9842f..80f0ff7 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala @@ -28,7 +28,7 @@ import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.kafka._ import org.apache.gearpump.streaming.sink.DataSinkProcessor import org.apache.gearpump.streaming.source.DataSourceProcessor http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala index fb80ad3..01aa95e 100644 --- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala +++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala @@ -23,7 +23,7 @@ import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.ShufflePartitioner +import org.apache.gearpump.streaming.partitioner.ShufflePartitioner import org.apache.gearpump.streaming.{Processor, StreamApplication} import org.apache.gearpump.util.Graph._ import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala index 9bd2bc5..59289a5 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.examples.state.processor.CountProcessor import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala index 629deb7..50235bc 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor} import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory import org.apache.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java index 6b5bba0..5e3d472 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java @@ -23,12 +23,11 @@ import org.apache.gearpump.cluster.ClusterConfig; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.cluster.client.ClientContext; import org.apache.gearpump.cluster.embedded.EmbeddedCluster; -import org.apache.gearpump.partitioner.HashPartitioner; -import org.apache.gearpump.partitioner.Partitioner; +import org.apache.gearpump.streaming.partitioner.HashPartitioner; +import org.apache.gearpump.streaming.partitioner.Partitioner; import org.apache.gearpump.streaming.javaapi.Graph; import org.apache.gearpump.streaming.javaapi.Processor; import org.apache.gearpump.streaming.javaapi.StreamApplication; -import org.apache.gearpump.util.Constants; /** Java version of WordCount with Processor Graph API */ public class WordCount { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala index 9580e63..0e3d840 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala @@ -23,7 +23,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.embedded.EmbeddedCluster import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.source.DataSourceProcessor import org.apache.gearpump.streaming.{Processor, StreamApplication} import org.apache.gearpump.util.Graph.Node http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala index aaa0a99..4969314 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.experiments.storm.partitioner import org.apache.gearpump.Message import org.apache.gearpump.experiments.storm.topology.GearpumpTuple -import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner} /** * Partitioner bound to a target Storm component http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala index 777acab..e3f1339 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.experiments.storm.util import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology -import org.apache.gearpump.partitioner.Partitioner +import org.apache.gearpump.streaming.partitioner.Partitioner import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala index 5513423..5fc631b 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala @@ -27,7 +27,7 @@ import org.scalatest.{Matchers, PropSpec} import org.apache.gearpump.Message import org.apache.gearpump.experiments.storm.topology.GearpumpTuple -import org.apache.gearpump.partitioner.Partitioner +import org.apache.gearpump.streaming.partitioner.Partitioner class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala index 89b8ef7..a1d1162 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala @@ -33,7 +33,7 @@ class DynamicDagSpec extends TestSpecBase { val partitioners = restClient.queryBuiltInPartitioners() partitioners.length should be > 0 partitioners.foreach(clazz => - clazz should startWith("org.apache.gearpump.partitioner.") + clazz should startWith("org.apache.gearpump.streaming.partitioner.") ) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index bf7092e..b08e0fd 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -41,7 +41,7 @@ import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.worker.WorkerSummary import org.apache.gearpump.cluster.{ClusterConfig, UserConfig} import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer} -import org.apache.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription} +import org.apache.gearpump.streaming.partitioner.{PartitionerByClassName, PartitionerDescription} import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} // NOTE: This cannot be removed!!! import org.apache.gearpump.services.util.UpickleUtil._ @@ -192,7 +192,8 @@ class MasterService(val master: ActorRef, } ~ path("partitioners") { get { - complete(write(BuiltinPartitioners(Constants.BUILTIN_PARTITIONERS.map(_.getName)))) + complete(write(BuiltinPartitioners(org.apache.gearpump.streaming.Constants + .BUILTIN_PARTITIONERS.map(_.getName)))) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java index 8f85aa3..aaf6db8 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Graph.java @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.javaapi; -import org.apache.gearpump.partitioner.Partitioner; +import org.apache.gearpump.streaming.partitioner.Partitioner; import org.apache.gearpump.streaming.Processor; import org.apache.gearpump.streaming.task.Task; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index f99a436..d7582b0 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming +import org.apache.gearpump.streaming.partitioner._ + object Constants { val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" @@ -36,4 +38,12 @@ object Constants { val GEARPUMP_STREAMING_EXECUTOR_RESTART_TIME_WINDOW = "gearpump.streaming.executor-restart-time-window" + + // The partitioners provided by Gearpump + val BUILTIN_PARTITIONERS = Array( + classOf[BroadcastPartitioner], + classOf[CoLocationPartitioner], + classOf[HashPartitioner], + classOf[ShuffleGroupingPartitioner], + classOf[ShufflePartitioner]) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala index 4a94ad3..8ad74f8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index a6588a1..ca8d89e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -25,7 +25,7 @@ import akka.actor.ActorSystem import org.apache.gearpump.TimeStamp import org.apache.gearpump.cluster._ -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} import org.apache.gearpump.streaming.appmaster.AppMaster import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.{Graph, LogUtil, ReferenceEqual} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index 31e1151..1341464 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -30,7 +30,7 @@ import org.apache.gearpump.cluster._ import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.metrics.Metrics.ReportMetrics import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask} import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.AppMaster._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala index 2736f5e..6154946 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor.{ExtendedActorSystem, Actor, ActorRef, Stash} import akka.serialization.JavaSerializer import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.DagManager._ import org.apache.gearpump.streaming.storage.AppDataStore http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala index 6de5306..e023cdf 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.TimeStamp import org.apache.gearpump.cluster.AppJar import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.appmaster.JarScheduler._ import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.streaming.{DAG, ProcessorDescription} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala index 2ec881b..efa7409 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala @@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.dsl.partitioner import org.apache.gearpump.Message -import org.apache.gearpump.partitioner.UnicastPartitioner +import org.apache.gearpump.streaming.partitioner.UnicastPartitioner import org.apache.gearpump.streaming.dsl.window.api.GroupByFn /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index 16d5c06..65f9cd2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.dsl.plan import akka.actor.ActorSystem -import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.task.Task http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala new file mode 100644 index 0000000..9b63e04 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +/** Used by storm module to broadcast message to all downstream tasks */ +class BroadcastPartitioner extends MulticastPartitioner { + private var lastPartitionNum = -1 + private var partitions = Array.empty[Int] + + override def getPartitions( + msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { + if (partitionNum != lastPartitionNum) { + partitions = (0 until partitionNum).toArray + lastPartitionNum = partitionNum + } + partitions + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala new file mode 100644 index 0000000..4cb1bad --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +/** + * Will have the same parallelism with last processor + * And each task in current processor will co-locate with task of last processor + */ +class CoLocationPartitioner extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + currentPartitionId + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala new file mode 100644 index 0000000..6137705 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +/** + * Only make sense when the message has implemented the hashCode() + * Otherwise, it will use Object.hashCode(), which will not return + * same hash code after serialization and deserialization. + */ +class HashPartitioner extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala new file mode 100644 index 0000000..f685cc9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.commons.lang.SerializationUtils +import org.apache.gearpump.Message + +import scala.reflect.ClassTag + +/** + * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task + * of upstream processor A send to several tasks of downstream processor B. + */ +sealed trait Partitioner extends Serializable + +/** + * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does + * ONE-task {@literal ->} ONE-task mapping. + */ +trait UnicastPartitioner extends Partitioner { + + /** + * Gets the SINGLE downstream processor task index to send message to. + * + * @param msg Message you want to send + * @param partitionNum How many tasks does the downstream processor have. + * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call. + * + * @return ONE task index of downstream processor. + */ + def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int + + def getPartition(msg: Message, partitionNum: Int): Int = { + getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) + } +} + +trait MulticastPartitioner extends Partitioner { + + /** + * Gets a list of downstream processor task indexes to send message to. + * + * @param upstreamTaskIndex Current sender task's task index. + * + */ + def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int] + + def getPartitions(msg: Message, partitionNum: Int): Array[Int] = { + getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID) + } +} + +sealed trait PartitionerFactory { + + def name: String + + def partitioner: Partitioner +} + +/** Stores the Partitioner in an object. To use it, user need to deserialize the object */ +class PartitionerObject(private[this] val _partitioner: Partitioner) + extends PartitionerFactory with Serializable { + + override def name: String = partitioner.getClass.getName + + override def partitioner: Partitioner = { + SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner] + } +} + +/** Store the partitioner in class Name, the user need to instantiate a new class */ +class PartitionerByClassName(partitionerClass: String) + extends PartitionerFactory with Serializable { + + override def name: String = partitionerClass + override def partitioner: Partitioner = { + Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner] + } +} + +/** + * @param partitionerFactory How we construct a Partitioner. + */ +case class PartitionerDescription(partitionerFactory: PartitionerFactory) + +object Partitioner { + val UNKNOWN_PARTITION_ID = -1 + + def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = { + PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala new file mode 100644 index 0000000..1b223e0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message + +import scala.util.Random + +/** + * The idea of ShuffleGroupingPartitioner is derived from Storm. + * Messages are randomly distributed across the downstream's tasks in a way such that + * each task is guaranteed to get an equal number of messages. + */ +class ShuffleGroupingPartitioner extends UnicastPartitioner { + private val random = new Random + private var index = -1 + private var partitions = List.empty[Int] + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + index += 1 + if (partitions.isEmpty) { + partitions = 0.until(partitionNum).toList + partitions = random.shuffle(partitions) + } else if (index >= partitionNum) { + index = 0 + partitions = random.shuffle(partitions) + } + partitions(index) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala new file mode 100644 index 0000000..39d5e3b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import java.util.Random + +import org.apache.gearpump.Message + +/** + * Round Robin partition the data to downstream processor tasks. + */ +class ShufflePartitioner extends UnicastPartitioner { + private var seed = 0 + private var count = 0 + + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + + if (seed == 0) { + seed = newSeed() + } + + val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum + count = count + 1 + result + } + + private def newSeed(): Int = new Random().nextInt() +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala index 692d7f9..5c99980 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.task -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.{DAG, LifeTime} /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index d9fbc82..16f9e93 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.task import org.slf4j.Logger import org.apache.gearpump.google.common.primitives.Shorts -import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} +import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException import org.apache.gearpump.streaming.LifeTime import org.apache.gearpump.streaming.task.Subscription._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala index 5f4faee..ccda8f0 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph.Node http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index e461ae8..29dfc57 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.master.MasterProxy import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.jarstore.FilePath -import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.partitioner.HashPartitioner import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask} import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala index d42fe6f..46175a4 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit, TestProbe} import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock} import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store import org.apache.gearpump.streaming.storage.AppDataStore http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala index be3b3b7..adde927 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -22,7 +22,7 @@ package org.apache.gearpump.streaming.appmaster import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange} import org.apache.gearpump.streaming.task.{Subscriber, TaskActor} import org.apache.gearpump.streaming._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala index 5f6dd04..def9d44 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala @@ -22,7 +22,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{AppJar, TestUtil} import org.apache.gearpump.jarstore.FilePath -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.streaming.{DAG, ProcessorDescription, _} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index 54ecde1..bcf96e4 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{AppJar, TestUtil, UserConfig} import org.apache.gearpump.jarstore.FilePath -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription} import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered} import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask import org.apache.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala index 864aa93..1bfde94 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala @@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala index e0407ec..db4db93 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.dsl import akka.actor.ActorSystem import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.partitioner.PartitionerDescription import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala index fdc721b..8def61e 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala @@ -22,7 +22,7 @@ import akka.actor._ import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.dsl.StreamSpec.Join import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index f8666ba..1610f0e 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -23,7 +23,7 @@ import java.time.Instant import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.CoLocationPartitioner +import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala new file mode 100644 index 0000000..277a31c --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message +import org.scalatest.{FlatSpec, Matchers} + +class PartitionerSpec extends FlatSpec with Matchers { + val NUM = 10 + + "HashPartitioner" should "hash same key to same slots" in { + val partitioner = new HashPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } + + "ShufflePartitioner" should "hash same key randomly" in { + val partitioner = new ShufflePartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } + + "BroadcastPartitioner" should "return all partitions" in { + val partitioner = new BroadcastPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + val partitions = partitioner.getPartitions(msg, NUM) + + partitions should contain theSameElementsAs 0.until(NUM) + } + + + "ShuffleGroupingPartitioner" should "hash same key randomly" in { + val partitioner = new ShuffleGroupingPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala index cfe47eb..bd3ddb8 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala @@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.task import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.task.SubscriberSpec.TestTask import org.apache.gearpump.streaming.{DAG, ProcessorDescription} import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index 258a5ff..d128ace 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -26,7 +26,7 @@ import org.scalatest.{FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb37ce80/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala index 48901d2..8deee78 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala @@ -25,7 +25,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} import org.apache.gearpump.Message import org.apache.gearpump.cluster.{MasterHarness, TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.serializer.{FastKryoSerializer, SerializationFramework} import org.apache.gearpump.streaming.AppMasterToExecutor.{ChangeTask, MsgLostException, StartTask, TaskChanged, TaskRegistered} import org.apache.gearpump.streaming.task.TaskActorSpec.TestTask
