[ 
https://issues.apache.org/jira/browse/SPARK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153059#comment-17153059
 ] 

Apache Spark commented on SPARK-32212:
--------------------------------------

User 'izchen' has created a pull request for this issue:
https://github.com/apache/spark/pull/29028

> RDD.takeOrdered can choose to merge intermediate results in executor or driver
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-32212
>                 URL: https://issues.apache.org/jira/browse/SPARK-32212
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Chen Zhang
>            Priority: Major
>
> In the list of issues, I saw some discussions about exceeding the memory 
> limit of the driver when using _RDD.takeOrdered_ or _SQL(order by xx limit 
> xx)_. I think that the implementation of _RDD.takeOrdered_ can be improved.
> In the original code implementation of _RDD.takeOrdered_, the QuickSelect 
> algorithm in guava is used in the executor process to calculate the local 
> TopK results of each RDD partition. These intermediate results are packaged 
> into java.util.PriorityQueue and returned to the driver process. In the 
> driver process, these intermediate results are merged to get the global TopK 
> results.
> The problem with this implementation is that if the intermediate results are 
> too large and too many partitions, the intermediate results may accumulate in 
> the memory of the driver process, causing excessive memory pressure.
> We can use an optional config to determine whether the intermediate 
> results(local TopK) of each partition in _RDD.takeOrdered_ will be merged in 
> driver process or executor process. If set to true, merge in driver 
> process(by util.PriorityQueue), which will get shorter waiting time for 
> return. But if the intermediate results are too large and too many 
> partitions, the intermediate results may accumulate in the memory of the 
> driver process, causing excessive memory pressure. If set to false, merge in 
> executor process(by guava.QuickSelect), intermediate results will not 
> accumulate in memory, but will cause longer runtimes.
> something like:
> _(org.apache.spark.rdd.RDD)_
> {code:scala}
>   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
>     if (num == 0 || partitions.length == 0) {
>       Array.empty
>     } else {
>       if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
>         val mapRDDs = mapPartitions { items =>
>           // Priority keeps the largest elements, so let's reverse the 
> ordering.
>           val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
>           queue ++= collectionUtils.takeOrdered(items, num)(ord)
>           Iterator.single(queue)
>         }
>         mapRDDs.reduce { (queue1, queue2) =>
>           queue1 ++= queue2
>           queue1
>         }.toArray.sorted(ord)
>       } else {
>         mapPartitions { items =>
>           collectionUtils.takeOrdered(items, num)(ord)
>         }.repartition(1).mapPartitions { items =>
>           collectionUtils.takeOrdered(items, num)(ord)
>         }.collect()
>       }
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to