This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a027db1dc03 [SPARK-40276][CORE] Reduce the result size of 
RDD.takeOrdered
a027db1dc03 is described below

commit a027db1dc0379d823cbd638181bb25095f4e6577
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Thu Sep 1 09:19:11 2022 +0800

    [SPARK-40276][CORE] Reduce the result size of RDD.takeOrdered
    
    ### What changes were proposed in this pull request?
    use `Array` instead of `BoundedPriorityQueue` to store intermediate results
    
    ### Why are the changes needed?
    1, encountered a case that `RDD.takeOrdered` fails due to `Total size of 
serialized results of xxx tasks (... MiB) is bigger than spark.driver.`
    
    2, performance improvement:
    
    `bin/spark-shell --driver-memory=4G`
    
    ```scala
    Seq(10, 100, 1000, 10000, 50000, 100000).foreach { n => val start = 
System.currentTimeMillis; Seq.range(0, 10).foreach(_ => sc.range(0, 100000000, 
1, 1000).top(n)); val duration = System.currentTimeMillis - start; 
println(s"n=$n, duration=$duration") }
    ```
    
     duration | n=10 | n=100 | n=1,000 | n=10,000 | n=50,000 | n=100,000
    -- | -- | -- | -- | -- | -- | --
    master | 2,552 | 2,197 | 2,543 | 10,003 | 58,552 | 
org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
of serialized results of 763 tasks (1024.6 MiB) is bigger than spark.driver
    this PR | 2,556 | 2,138 | 2,196 | 7,371  | 33,903 | 66,895
    this PR + treeReduce | 9,160 | 9,748 | 9,728  | 11,441  | 17,216 | 24,728
    
    it is strange that `this PR + treeReduce` turns out to be slowest when `n` 
is small, so still use `reduce` in this PR.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    added UT
    
    Closes #37728 from zhengruifeng/core_take_ordered.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 32 ++++++++++++----------
 .../org/apache/spark/util/collection/Utils.scala   | 19 ++++++++++++-
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala |  5 ++++
 .../resources/tpcds-query-results/v1_4/q77.sql.out |  2 +-
 .../tpcds-query-results/v2_7/q77a.sql.out          |  2 +-
 5 files changed, 42 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 461510b2526..d12804fc12b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -46,7 +46,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
-import org.apache.spark.util.{BoundedPriorityQueue, Utils}
+import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
   Utils => collectionUtils}
 import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, 
PoissonSampler,
@@ -1523,22 +1523,24 @@ abstract class RDD[T: ClassTag](
    * @return an array of top elements
    */
   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
-    if (num == 0) {
+    if (num == 0 || this.getNumPartitions == 0) {
       Array.empty
     } else {
-      val mapRDDs = mapPartitions { items =>
-        // Priority keeps the largest elements, so let's reverse the ordering.
-        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
-        queue ++= collectionUtils.takeOrdered(items, num)(ord)
-        Iterator.single(queue)
-      }
-      if (mapRDDs.partitions.length == 0) {
-        Array.empty
-      } else {
-        mapRDDs.reduce { (queue1, queue2) =>
-          queue1 ++= queue2
-          queue1
-        }.toArray.sorted(ord)
+      this.mapPartitionsWithIndex { case (pid, iter) =>
+        if (iter.nonEmpty) {
+          // Priority keeps the largest elements, so let's reverse the 
ordering.
+          Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
+        } else if (pid == 0) {
+          // make sure partition 0 always returns an array to avoid reduce on 
empty RDD
+          Iterator.single(Array.empty[T])
+        } else {
+          Iterator.empty
+        }
+      }.reduce { (array1, array2) =>
+        val size = math.min(num, array1.length + array2.length)
+        val array = Array.ofDim[T](size)
+        collectionUtils.mergeOrdered[T](Seq(array1, 
array2))(ord).copyToArray(array, 0, size)
+        array
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
index 8b543f1642a..989ff691417 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util.collection
 
 import scala.collection.JavaConverters._
 
-import com.google.common.collect.{Ordering => GuavaOrdering}
+import com.google.common.collect.{Iterators => GuavaIterators, Ordering => 
GuavaOrdering}
 
 /**
  * Utility functions for collections.
@@ -37,6 +37,23 @@ private[spark] object Utils {
     ordering.leastOf(input.asJava, num).iterator.asScala
   }
 
+  /**
+   * Returns an iterator over the merged contents of all given input iterators,
+   * traversing every element of the input iterators.
+   * Equivalent entries will not be de-duplicated.
+   *
+   * Callers must ensure that all the input iterators are already sorted by
+   * the same ordering `ord`, otherwise the result is likely to be incorrect.
+   */
+  def mergeOrdered[T](inputs: Iterable[TraversableOnce[T]])(
+    implicit ord: Ordering[T]): Iterator[T] = {
+    val ordering = new GuavaOrdering[T] {
+      override def compare(l: T, r: T): Int = ord.compare(l, r)
+    }
+    GuavaIterators.mergeSorted(
+      inputs.map(_.toIterator.asJava).asJava, ordering).asScala
+  }
+
   /**
    * Only returns `Some` iff ALL elements in `input` are defined. In this 
case, it is
    * equivalent to `Some(input.flatten)`.
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index c64573f7a0a..82d549089d5 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -697,6 +697,11 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext with Eventually {
     assert(sortedLowerK.size === 0)
   }
 
+  test("SPARK-40276: takeOrdered with empty RDDs") {
+    assert(sc.emptyRDD[Int].takeOrdered(5) === Array.emptyIntArray)
+    assert(sc.range(0, 10, 1, 3).filter(_ < 0).takeOrdered(5) === 
Array.emptyLongArray)
+  }
+
   test("takeOrdered with custom ordering") {
     val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
     implicit val ord = implicitly[Ordering[Int]].reverse
diff --git a/sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out 
b/sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out
index aef644cbf74..be0b3ba36d7 100644
--- a/sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out
+++ b/sql/core/src/test/resources/tpcds-query-results/v1_4/q77.sql.out
@@ -4,8 +4,8 @@
 
struct<channel:string,id:int,sales:decimal(27,2),returns:decimal(27,2),profit:decimal(28,2)>
 -- !query output
 NULL   NULL    238379361.39    11949589.80     -69066318.65
-catalog channel        NULL    116209.49       1989207.49      -1103184.43
 catalog channel        NULL    81893158.01     7956829.96      -13266843.17
+catalog channel        NULL    116209.49       1989207.49      -1103184.43
 catalog channel        1       26819348.55     1989207.49      -4169636.96
 catalog channel        2       27454600.50     1989207.49      -3825432.73
 catalog channel        5       27502999.47     1989207.49      -4168589.05
diff --git a/sql/core/src/test/resources/tpcds-query-results/v2_7/q77a.sql.out 
b/sql/core/src/test/resources/tpcds-query-results/v2_7/q77a.sql.out
index 1f822ce6788..3996a4c9f4d 100644
--- a/sql/core/src/test/resources/tpcds-query-results/v2_7/q77a.sql.out
+++ b/sql/core/src/test/resources/tpcds-query-results/v2_7/q77a.sql.out
@@ -4,8 +4,8 @@
 
struct<channel:string,id:int,sales:decimal(37,2),returns:decimal(37,2),profit:decimal(38,2)>
 -- !query output
 NULL   NULL    239062306.14    9940693.53      -67351905.74
-catalog channel        NULL    81456313.49     6721169.80      -11963308.94
 catalog channel        NULL    120443.39       1680292.45      -994006.90
+catalog channel        NULL    81456313.49     6721169.80      -11963308.94
 catalog channel        1       25511213.21     1680292.45      -4013845.35
 catalog channel        2       28320909.41     1680292.45      -3815679.20
 catalog channel        4       27503747.48     1680292.45      -3139777.49


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to