Repository: spark
Updated Branches:
  refs/heads/master a8c6d0f64 -> 7aac755ba


[SPARK-21410][CORE] Create less partitions for RangePartitioner if RDD.count() 
is less than `partitions`

## What changes were proposed in this pull request?

Fix a bug in RangePartitioner:
In RangePartitioner(partitions: Int, rdd: RDD[]), 
RangePartitioner.numPartitions is wrong if the number of elements in RDD 
(rdd.count()) is less than number of partitions (partitions in constructor).

## How was this patch tested?

test as described in 
[SPARK-SPARK-21410](https://issues.apache.org/jira/browse/SPARK-21410
)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Zhang A Peng <zhan...@cn.ibm.com>

Closes #18631 from apapi/fixRangePartitioner.numPartitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7aac755b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7aac755b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7aac755b

Branch: refs/heads/master
Commit: 7aac755ba05be3689ae25f4b9183b32fa3408b89
Parents: a8c6d0f
Author: Zhang A Peng <zhan...@cn.ibm.com>
Authored: Tue Jul 18 09:57:53 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Jul 18 09:57:53 2017 +0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/Partitioner.scala       | 2 +-
 core/src/test/scala/org/apache/spark/PartitioningSuite.scala | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7aac755b/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index f83f527..1484f29 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -153,7 +153,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
           val weight = (1.0 / fraction).toFloat
           candidates ++= reSampled.map(x => (x, weight))
         }
-        RangePartitioner.determineBounds(candidates, partitions)
+        RangePartitioner.determineBounds(candidates, math.min(partitions, 
candidates.size))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7aac755b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala 
b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 34c0178..dfe4c25 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -253,6 +253,12 @@ class PartitioningSuite extends SparkFunSuite with 
SharedSparkContext with Priva
 
     // Add other tests here for classes that should be able to handle empty 
partitions correctly
   }
+
+  test("Number of elements in RDD is less than number of partitions") {
+    val rdd = sc.parallelize(1 to 3).map(x => (x, x))
+    val partitioner = new RangePartitioner(22, rdd)
+    assert(partitioner.numPartitions === 3)
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to