This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a027db1dc03 [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered a027db1dc03 is described below commit a027db1dc0379d823cbd638181bb25095f4e6577 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Thu Sep 1 09:19:11 2022 +0800 [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered ### What changes were proposed in this pull request? use `Array` instead of `BoundedPriorityQueue` to store intermediate results ### Why are the changes needed? 1, encountered a case that `RDD.takeOrdered` fails due to `Total size of serialized results of xxx tasks (... MiB) is bigger than spark.driver.` 2, performance improvement: `bin/spark-shell --driver-memory=4G` ```scala Seq(10, 100, 1000, 10000, 50000, 100000).foreach { n => val start = System.currentTimeMillis; Seq.range(0, 10).foreach(_ => sc.range(0, 100000000, 1, 1000).top(n)); val duration = System.currentTimeMillis - start; println(s"n=$n, duration=$duration") } ``` duration | n=10 | n=100 | n=1,000 | n=10,000 | n=50,000 | n=100,000 -- | -- | -- | -- | -- | -- | -- master | 2,552 | 2,197 | 2,543 | 10,003 | 58,552 | org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 763 tasks (1024.6 MiB) is bigger than spark.driver this PR | 2,556 | 2,138 | 2,196 | 7,371 | 33,903 | 66,895 this PR + treeReduce | 9,160 | 9,748 | 9,728 | 11,441 | 17,216 | 24,728 it is strange that `this PR + treeReduce` turns out to be slowest when `n` is small, so still use `reduce` in this PR. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added UT Closes #37728 from zhengruifeng/core_take_ordered. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 32 ++++++++++++---------- .../org/apache/spark/util/collection/Utils.scala | 19 ++++++++++++- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 5 ++++ .../resources/tpcds-query-results/v1_4/q77.sql.out | 2 +- .../tpcds-query-results/v2_7/q77a.sql.out | 2 +- 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 461510b2526..d12804fc12b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -46,7 +46,7 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.resource.ResourceProfile import org.apache.spark.storage.{RDDBlockId, StorageLevel} -import org.apache.spark.util.{BoundedPriorityQueue, Utils} +import org.apache.spark.util.Utils import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, @@ -1523,22 +1523,24 @@ abstract class RDD[T: ClassTag]( * @return an array of top elements */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { - if (num == 0) { + if (num == 0 || this.getNumPartitions == 0) { Array.empty } else { - val mapRDDs = mapPartitions { items => - // Priority keeps the largest elements, so let's reverse the ordering. - val queue = new BoundedPriorityQueue[T](num)(ord.reverse) - queue ++= collectionUtils.takeOrdered(items, num)(ord) - Iterator.single(queue) - } - if (mapRDDs.partitions.length == 0) { - Array.empty - } else { - mapRDDs.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) + this.mapPartitionsWithIndex { case (pid, iter) => + if (iter.nonEmpty) { + // Priority keeps the largest elements, so let's reverse the ordering. + Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray) + } else if (pid == 0) { + // make sure partition 0 always returns an array to avoid reduce on empty RDD + Iterator.single(Array.empty[T]) + } else { + Iterator.empty + } + }.reduce { (array1, array2) => + val size = math.min(num, array1.length + array2.length) + val array = Array.ofDim[T](size) + collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).copyToArray(array, 0, size) + array } } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 8b543f1642a..989ff691417 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -19,7 +19,7 @@ package org.apache.spark.util.collection import scala.collection.JavaConverters._ -import com.google.common.collect.{Ordering => GuavaOrdering} +import com.google.common.collect.{Iterators => GuavaIterators, Ordering => GuavaOrdering} /** * Utility functions for collections. @@ -37,6 +37,23 @@ private[spark] object Utils { ordering.leastOf(input.asJava, num).iterator.asScala } + /** + * Returns an iterator over the merged contents of all given input iterators, + * traversing every element of the input iterators. + * Equivalent entries will not be de-duplicated. + * + * Callers must ensure that all the input iterators are already sorted by + * the same ordering `ord`, otherwise the result is likely to be incorrect. + */ + def mergeOrdered[T](inputs: Iterable[TraversableOnce[T]])( + implicit ord: Ordering[T]): Iterator[T] = { + val ordering = new GuavaOrdering[T] { + override def compare(l: T, r: T): Int = ord.compare(l, r) + } + GuavaIterators.mergeSorted( + inputs.map(_.toIterator.asJava).asJava, ordering).asScala + } + /** * Only returns `Some` iff ALL elements in `input` are defined. In this case, it is * equivalent to `Some(input.flatten)`. diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index c64573f7a0a..82d549089d5 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -697,6 +697,11 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(sortedLowerK.size === 0) } + test("SPARK-40276: takeOrdered with empty RDDs") { + assert(sc.emptyRDD[Int].takeOrdered(5) === Array.emptyIntArray) + assert(sc.range(0, 10, 1, 3).filter(_ < 0).takeOrdered(5) === Array.emptyLongArray) + } + test("takeOrdered with custom ordering") { val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) implicit val ord = implicitly[Ordering[Int]].reverse diff --git a/sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out b/sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out index aef644cbf74..be0b3ba36d7 100644 --- a/sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out +++ b/sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out @@ -4,8 +4,8 @@ struct<channel:string,id:int,sales:decimal(27,2),returns:decimal(27,2),profit:decimal(28,2)> -- !query output NULL NULL 238379361.39 11949589.80 -69066318.65 -catalog channel NULL 116209.49 1989207.49 -1103184.43 catalog channel NULL 81893158.01 7956829.96 -13266843.17 +catalog channel NULL 116209.49 1989207.49 -1103184.43 catalog channel 1 26819348.55 1989207.49 -4169636.96 catalog channel 2 27454600.50 1989207.49 -3825432.73 catalog channel 5 27502999.47 1989207.49 -4168589.05 diff --git a/sql/core/src/test/resources/tpcds-query-results/v2_7/q77a.sql.out b/sql/core/src/test/resources/tpcds-query-results/v2_7/q77a.sql.out index 1f822ce6788..3996a4c9f4d 100644 --- a/sql/core/src/test/resources/tpcds-query-results/v2_7/q77a.sql.out +++ b/sql/core/src/test/resources/tpcds-query-results/v2_7/q77a.sql.out @@ -4,8 +4,8 @@ struct<channel:string,id:int,sales:decimal(37,2),returns:decimal(37,2),profit:decimal(38,2)> -- !query output NULL NULL 239062306.14 9940693.53 -67351905.74 -catalog channel NULL 81456313.49 6721169.80 -11963308.94 catalog channel NULL 120443.39 1680292.45 -994006.90 +catalog channel NULL 81456313.49 6721169.80 -11963308.94 catalog channel 1 25511213.21 1680292.45 -4013845.35 catalog channel 2 28320909.41 1680292.45 -3815679.20 catalog channel 4 27503747.48 1680292.45 -3139777.49 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org