Repository: spark Updated Branches: refs/heads/master dd8f6b1ce -> f856fe483
[SPARK-21436][CORE] Take advantage of known partitioner for distinct on RDDs to avoid a shuffle ## What changes were proposed in this pull request? Special case the situation where we know the partioner and the number of requested partions output is the same as the current partioner to avoid a shuffle and instead compute distinct inside of each partion. ## How was this patch tested? New unit test that verifies partitioner does not change if the partitioner is known and distinct is called with the same target # of partition. Closes #22010 from holdenk/SPARK-21436-take-advantage-of-known-partioner-for-distinct-on-rdds. Authored-by: Holden Karau <hol...@pigscanfly.ca> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f856fe48 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f856fe48 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f856fe48 Branch: refs/heads/master Commit: f856fe4839757e3a1036df3fc3dec459fa439aef Parents: dd8f6b1 Author: Holden Karau <hol...@pigscanfly.ca> Authored: Thu Sep 27 20:57:56 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Sep 27 20:57:56 2018 +0800 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/rdd/RDD.scala | 18 ++++++++++++++++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 12 ++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f856fe48/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 61ad6df..743e344 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -42,7 +42,8 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} -import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, + Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, SamplingUtils} @@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { - map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { + // Create an instance of external append only map which ignores values. + val map = new ExternalAppendOnlyMap[T, Null, Null]( + createCombiner = value => null, + mergeValue = (a, b) => a, + mergeCombiners = (a, b) => a) + map.insertAll(partition.map(_ -> null)) + map.iterator.map(_._1) + } + partitioner match { + case Some(p) if numPartitions == partitions.length => + mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) + case _ => map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/f856fe48/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index b143a46..2227698 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } + test("distinct with known partitioner preserves partitioning") { + val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 10)).sortByKey() + val initialPartitioner = rdd.partitioner + val distinctRdd = rdd.distinct() + val resultingPartitioner = distinctRdd.partitioner + assert(initialPartitioner === resultingPartitioner) + val distinctRddDifferent = rdd.distinct(5) + val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner + assert(initialPartitioner != distinctRddDifferentPartitioner) + assert(distinctRdd.collect().sorted === distinctRddDifferent.collect().sorted) + } + test("countApproxDistinct") { def error(est: Long, size: Long): Double = math.abs(est - size) / size.toDouble --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org