Repository: spark
Updated Branches:
  refs/heads/master b2ce17b4c -> 96cb60bc3


[SPARK-22465][FOLLOWUP] Update the number of partitions of default partitioner 
when defaultParallelism is set

## What changes were proposed in this pull request?

#20002 purposed a way to safe check the default partitioner, however, if 
`spark.default.parallelism` is set, the defaultParallelism still could be 
smaller than the proper number of partitions for upstreams RDDs. This PR tries 
to extend the approach to address the condition when 
`spark.default.parallelism` is set.

The requirements where the PR helps with are :
- Max partitioner is not eligible since it is atleast an order smaller, and
- User has explicitly set 'spark.default.parallelism', and
- Value of 'spark.default.parallelism' is lower than max partitioner
- Since max partitioner was discarded due to being at least an order smaller, 
default parallelism is worse - even though user specified.

Under the rest cases, the changes should be no-op.

## How was this patch tested?

Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`.

Author: Xingbo Jiang <xingbo.ji...@databricks.com>

Closes #20091 from jiangxb1987/partitioner.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96cb60bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96cb60bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96cb60bc

Branch: refs/heads/master
Commit: 96cb60bc33936c1aaf728a1738781073891480ff
Parents: b2ce17b
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Tue Jan 23 04:08:32 2018 -0800
Committer: Mridul Muralidharan <mri...@gmail.com>
Committed: Tue Jan 23 04:08:32 2018 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    | 51 +++++++++++---------
 .../org/apache/spark/PartitioningSuite.scala    | 44 ++++++++++++++---
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 45 ++++++++++++++++-
 3 files changed, 108 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96cb60bc/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 437bbaa..c940cb2 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -43,17 +43,19 @@ object Partitioner {
   /**
    * Choose a partitioner to use for a cogroup-like operation between a number 
of RDDs.
    *
-   * If any of the RDDs already has a partitioner, and the number of 
partitions of the
-   * partitioner is either greater than or is less than and within a single 
order of
-   * magnitude of the max number of upstream partitions, choose that one.
+   * If spark.default.parallelism is set, we'll use the value of SparkContext 
defaultParallelism
+   * as the default partitions number, otherwise we'll use the max number of 
upstream partitions.
    *
-   * Otherwise, we use a default HashPartitioner. For the number of 
partitions, if
-   * spark.default.parallelism is set, then we'll use the value from 
SparkContext
-   * defaultParallelism, otherwise we'll use the max number of upstream 
partitions.
+   * When available, we choose the partitioner from rdds with maximum number 
of partitions. If this
+   * partitioner is eligible (number of partitions within an order of maximum 
number of partitions
+   * in rdds), or has partition number higher than default partitions number - 
we use this
+   * partitioner.
    *
-   * Unless spark.default.parallelism is set, the number of partitions will be 
the
-   * same as the number of partitions in the largest upstream RDD, as this 
should
-   * be least likely to cause out-of-memory errors.
+   * Otherwise, we'll use a new HashPartitioner with the default partitions 
number.
+   *
+   * Unless spark.default.parallelism is set, the number of partitions will be 
the same as the
+   * number of partitions in the largest upstream RDD, as this should be least 
likely to cause
+   * out-of-memory errors.
    *
    * We use two method parameters (rdd, others) to enforce callers passing at 
least 1 RDD.
    */
@@ -67,31 +69,32 @@ object Partitioner {
       None
     }
 
-    if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
+    val defaultNumPartitions = if 
(rdd.context.conf.contains("spark.default.parallelism")) {
+      rdd.context.defaultParallelism
+    } else {
+      rdds.map(_.partitions.length).max
+    }
+
+    // If the existing max partitioner is an eligible one, or its partitions 
number is larger
+    // than the default number of partitions, use the existing partitioner.
+    if (hasMaxPartitioner.nonEmpty && 
(isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
+        defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
       hasMaxPartitioner.get.partitioner.get
     } else {
-      if (rdd.context.conf.contains("spark.default.parallelism")) {
-        new HashPartitioner(rdd.context.defaultParallelism)
-      } else {
-        new HashPartitioner(rdds.map(_.partitions.length).max)
-      }
+      new HashPartitioner(defaultNumPartitions)
     }
   }
 
   /**
-   * Returns true if the number of partitions of the RDD is either greater
-   * than or is less than and within a single order of magnitude of the
-   * max number of upstream partitions;
-   * otherwise, returns false
+   * Returns true if the number of partitions of the RDD is either greater 
than or is less than and
+   * within a single order of magnitude of the max number of upstream 
partitions, otherwise returns
+   * false.
    */
   private def isEligiblePartitioner(
-     hasMaxPartitioner: Option[RDD[_]],
+     hasMaxPartitioner: RDD[_],
      rdds: Seq[RDD[_]]): Boolean = {
-    if (hasMaxPartitioner.isEmpty) {
-      return false
-    }
     val maxPartitions = rdds.map(_.partitions.length).max
-    log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
+    log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96cb60bc/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala 
b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 155ca17..9206b5d 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -262,14 +262,11 @@ class PartitioningSuite extends SparkFunSuite with 
SharedSparkContext with Priva
 
   test("defaultPartitioner") {
     val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
-    val rdd2 = sc
-      .parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+    val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
       .partitionBy(new HashPartitioner(10))
-    val rdd3 = sc
-      .parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
+    val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 
14)))
       .partitionBy(new HashPartitioner(100))
-    val rdd4 = sc
-      .parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+    val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
       .partitionBy(new HashPartitioner(9))
     val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
 
@@ -284,7 +281,42 @@ class PartitioningSuite extends SparkFunSuite with 
SharedSparkContext with Priva
     assert(partitioner3.numPartitions == rdd3.getNumPartitions)
     assert(partitioner4.numPartitions == rdd3.getNumPartitions)
     assert(partitioner5.numPartitions == rdd4.getNumPartitions)
+  }
 
+  test("defaultPartitioner when defaultParallelism is set") {
+    assert(!sc.conf.contains("spark.default.parallelism"))
+    try {
+      sc.conf.set("spark.default.parallelism", "4")
+
+      val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
+      val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+        .partitionBy(new HashPartitioner(10))
+      val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 
14)))
+        .partitionBy(new HashPartitioner(100))
+      val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+        .partitionBy(new HashPartitioner(9))
+      val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
+      val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+        .partitionBy(new HashPartitioner(3))
+
+      val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2)
+      val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3)
+      val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1)
+      val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3)
+      val partitioner5 = Partitioner.defaultPartitioner(rdd4, rdd5)
+      val partitioner6 = Partitioner.defaultPartitioner(rdd5, rdd5)
+      val partitioner7 = Partitioner.defaultPartitioner(rdd1, rdd6)
+
+      assert(partitioner1.numPartitions == rdd2.getNumPartitions)
+      assert(partitioner2.numPartitions == rdd3.getNumPartitions)
+      assert(partitioner3.numPartitions == rdd3.getNumPartitions)
+      assert(partitioner4.numPartitions == rdd3.getNumPartitions)
+      assert(partitioner5.numPartitions == rdd4.getNumPartitions)
+      assert(partitioner6.numPartitions == sc.defaultParallelism)
+      assert(partitioner7.numPartitions == sc.defaultParallelism)
+    } finally {
+      sc.conf.remove("spark.default.parallelism")
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96cb60bc/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index a39e046..47af5c3 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -322,8 +322,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
   }
 
   // See SPARK-22465
-  test("cogroup between multiple RDD" +
-    " with number of partitions similar in order of magnitude") {
+  test("cogroup between multiple RDD with number of partitions similar in 
order of magnitude") {
     val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
     val rdd2 = sc
       .parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
@@ -332,6 +331,48 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
     assert(joined.getNumPartitions == rdd2.getNumPartitions)
   }
 
+  test("cogroup between multiple RDD when defaultParallelism is set without 
proper partitioner") {
+    assert(!sc.conf.contains("spark.default.parallelism"))
+    try {
+      sc.conf.set("spark.default.parallelism", "4")
+      val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
+      val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 10)
+      val joined = rdd1.cogroup(rdd2)
+      assert(joined.getNumPartitions == sc.defaultParallelism)
+    } finally {
+      sc.conf.remove("spark.default.parallelism")
+    }
+  }
+
+  test("cogroup between multiple RDD when defaultParallelism is set with 
proper partitioner") {
+    assert(!sc.conf.contains("spark.default.parallelism"))
+    try {
+      sc.conf.set("spark.default.parallelism", "4")
+      val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
+      val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+        .partitionBy(new HashPartitioner(10))
+      val joined = rdd1.cogroup(rdd2)
+      assert(joined.getNumPartitions == rdd2.getNumPartitions)
+    } finally {
+      sc.conf.remove("spark.default.parallelism")
+    }
+  }
+
+  test("cogroup between multiple RDD when defaultParallelism is set; with huge 
number of " +
+    "partitions in upstream RDDs") {
+    assert(!sc.conf.contains("spark.default.parallelism"))
+    try {
+      sc.conf.set("spark.default.parallelism", "4")
+      val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
+      val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+        .partitionBy(new HashPartitioner(10))
+      val joined = rdd1.cogroup(rdd2)
+      assert(joined.getNumPartitions == rdd2.getNumPartitions)
+    } finally {
+      sc.conf.remove("spark.default.parallelism")
+    }
+  }
+
   test("rightOuterJoin") {
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to