Repository: spark
Updated Branches:
  refs/heads/master dd8f6b1ce -> f856fe483


[SPARK-21436][CORE] Take advantage of known partitioner for distinct on RDDs to 
avoid a shuffle

## What changes were proposed in this pull request?

Special case the situation where we know the partioner and the number of 
requested partions output is the same as the current partioner to avoid a 
shuffle and instead compute distinct inside of each partion.

## How was this patch tested?

New unit test that verifies partitioner does not change if the partitioner is 
known and distinct is called with the same target # of partition.

Closes #22010 from 
holdenk/SPARK-21436-take-advantage-of-known-partioner-for-distinct-on-rdds.

Authored-by: Holden Karau <hol...@pigscanfly.ca>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f856fe48
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f856fe48
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f856fe48

Branch: refs/heads/master
Commit: f856fe4839757e3a1036df3fc3dec459fa439aef
Parents: dd8f6b1
Author: Holden Karau <hol...@pigscanfly.ca>
Authored: Thu Sep 27 20:57:56 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Sep 27 20:57:56 2018 +0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/rdd/RDD.scala | 18 ++++++++++++++++--
 .../scala/org/apache/spark/rdd/RDDSuite.scala     | 12 ++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f856fe48/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 61ad6df..743e344 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -42,7 +42,8 @@ import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{BoundedPriorityQueue, Utils}
-import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
+  Utils => collectionUtils}
 import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, 
PoissonSampler,
   SamplingUtils}
 
@@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag](
    * Return a new RDD containing the distinct elements in this RDD.
    */
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = 
withScope {
-    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+    def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
+      // Create an instance of external append only map which ignores values.
+      val map = new ExternalAppendOnlyMap[T, Null, Null](
+        createCombiner = value => null,
+        mergeValue = (a, b) => a,
+        mergeCombiners = (a, b) => a)
+      map.insertAll(partition.map(_ -> null))
+      map.iterator.map(_._1)
+    }
+    partitioner match {
+      case Some(p) if numPartitions == partitions.length =>
+        mapPartitions(removeDuplicatesInPartition, preservesPartitioning = 
true)
+      case _ => map(x => (x, null)).reduceByKey((x, y) => x, 
numPartitions).map(_._1)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f856fe48/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index b143a46..2227698 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext 
{
     assert(!deserial.toString().isEmpty())
   }
 
+  test("distinct with known partitioner preserves partitioning") {
+    val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 
10)).sortByKey()
+    val initialPartitioner = rdd.partitioner
+    val distinctRdd = rdd.distinct()
+    val resultingPartitioner = distinctRdd.partitioner
+    assert(initialPartitioner === resultingPartitioner)
+    val distinctRddDifferent = rdd.distinct(5)
+    val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner
+    assert(initialPartitioner != distinctRddDifferentPartitioner)
+    assert(distinctRdd.collect().sorted === 
distinctRddDifferent.collect().sorted)
+  }
+
   test("countApproxDistinct") {
 
     def error(est: Long, size: Long): Double = math.abs(est - size) / 
size.toDouble


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to