Repository: spark
Updated Branches:
  refs/heads/branch-1.0 618b3e6e7 -> 66fe4797a


[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition

This change adds a new partitioner which allows users
to specify # of keys per partition.

Author: Syed Hashmi <shas...@cloudera.com>

Closes #721 from syedhashmi/master and squashes the following commits:

4ca94cc [Syed Hashmi] [SPARK-1784] Add a new partitioner

(cherry picked from commit 92cebada09a7e5a00ab48bcb350a9462949c33eb)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66fe4797
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66fe4797
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66fe4797

Branch: refs/heads/branch-1.0
Commit: 66fe4797a845bb1a2728dcdb2d7371f0e90da867
Parents: 618b3e6
Author: Syed Hashmi <shas...@cloudera.com>
Authored: Tue May 13 21:24:23 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Tue May 13 21:25:01 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    | 61 ++++++++++++++++++++
 .../org/apache/spark/PartitioningSuite.scala    | 34 +++++++++++
 2 files changed, 95 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/66fe4797/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 9155159..6274796 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -156,3 +156,64 @@ class RangePartitioner[K : Ordering : ClassTag, V](
       false
   }
 }
+
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions records into specified 
bounds
+ * Default value is 1000. Once all partitions have bounds elements, the 
partitioner
+ * allocates 1 element per partition so eventually the smaller partitions are 
at most
+ * off by 1 key compared to the larger partitions.
+ */
+class BoundaryPartitioner[K : Ordering : ClassTag, V](
+                                                    partitions: Int,
+                                                    @transient rdd: RDD[_ <: 
Product2[K,V]],
+                                                    private val boundary: Int 
= 1000)
+  extends Partitioner {
+
+  // this array keeps track of keys assigned to a partition
+  // counts[0] refers to # of keys in partition 0 and so on
+  private val counts: Array[Int] = {
+    new Array[Int](numPartitions)
+  }
+
+  def numPartitions = math.abs(partitions)
+
+  /*
+  * Ideally, this should've been calculated based on # partitions and total 
keys
+  * But we are not calling count on RDD here to avoid calling an action.
+   * User has the flexibility of calling count and passing in any appropriate 
boundary
+   */
+  def keysPerPartition = boundary
+
+  var currPartition = 0
+
+  /*
+  * Pick current partition for the key until we hit the bound for keys / 
partition,
+  * start allocating to next partition at that time.
+  *
+  * NOTE: In case where we have lets say 2000 keys and user says 3 partitions 
with 500
+  * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 
1001-1500 go to P3,
+  * after that, next keys go to one partition at a time. So 1501 goes to P1, 
1502 goes to P2,
+  * 1503 goes to P3 and so on.
+   */
+  def getPartition(key: Any): Int = {
+    val partition = currPartition
+    counts(partition) = counts(partition) + 1
+    /*
+    * Since we are filling up a partition before moving to next one (this 
helps in maintaining
+    * order of keys, in certain cases, it is possible to end up with empty 
partitions, like
+    * 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition 
will be entirely
+    * empty.
+     */
+    if(counts(currPartition) >= keysPerPartition)
+      currPartition = (currPartition + 1) % numPartitions
+    partition
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case r: BoundaryPartitioner[_,_] =>
+      (r.counts.sameElements(counts) && r.boundary == boundary
+        && r.currPartition == currPartition)
+    case _ =>
+      false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/66fe4797/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala 
b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7c30626..7d40395 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -66,6 +66,40 @@ class PartitioningSuite extends FunSuite with 
SharedSparkContext with PrivateMet
     assert(descendingP4 != p4)
   }
 
+  test("BoundaryPartitioner equality") {
+    // Make an RDD where all the elements are the same so that the partition 
range bounds
+    // are deterministically all the same.
+    val rdd = sc.parallelize(1.to(4000)).map(x => (x, x))
+
+    val p2 = new BoundaryPartitioner(2, rdd, 1000)
+    val p4 = new BoundaryPartitioner(4, rdd, 1000)
+    val anotherP4 = new BoundaryPartitioner(4, rdd)
+
+    assert(p2 === p2)
+    assert(p4 === p4)
+    assert(p2 != p4)
+    assert(p4 != p2)
+    assert(p4 === anotherP4)
+    assert(anotherP4 === p4)
+  }
+
+  test("BoundaryPartitioner getPartition") {
+    val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
+    val partitioner = new BoundaryPartitioner(4, rdd,  500)
+    1.to(2000).map { element => {
+      val partition = partitioner.getPartition(element)
+      if (element <= 500) {
+        assert(partition === 0)
+      } else if (element > 501 && element <= 1000) {
+        assert(partition === 1)
+      } else if (element > 1001 && element <= 1500) {
+        assert(partition === 2)
+      } else if (element > 1501 && element <= 2000) {
+        assert(partition === 3)
+      }
+    }}
+  }
+
   test("RangePartitioner getPartition") {
     val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
     // We have different behaviour of getPartition for partitions with less 
than 1000 and more than

Reply via email to