[ https://issues.apache.org/jira/browse/SPARK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153060#comment-17153060 ]
Apache Spark commented on SPARK-32212: -------------------------------------- User 'izchen' has created a pull request for this issue: https://github.com/apache/spark/pull/29028 > RDD.takeOrdered can choose to merge intermediate results in executor or driver > ------------------------------------------------------------------------------ > > Key: SPARK-32212 > URL: https://issues.apache.org/jira/browse/SPARK-32212 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Chen Zhang > Priority: Major > > In the list of issues, I saw some discussions about exceeding the memory > limit of the driver when using _RDD.takeOrdered_ or _SQL(order by xx limit > xx)_. I think that the implementation of _RDD.takeOrdered_ can be improved. > In the original code implementation of _RDD.takeOrdered_, the QuickSelect > algorithm in guava is used in the executor process to calculate the local > TopK results of each RDD partition. These intermediate results are packaged > into java.util.PriorityQueue and returned to the driver process. In the > driver process, these intermediate results are merged to get the global TopK > results. > The problem with this implementation is that if the intermediate results are > too large and too many partitions, the intermediate results may accumulate in > the memory of the driver process, causing excessive memory pressure. > We can use an optional config to determine whether the intermediate > results(local TopK) of each partition in _RDD.takeOrdered_ will be merged in > driver process or executor process. If set to true, merge in driver > process(by util.PriorityQueue), which will get shorter waiting time for > return. But if the intermediate results are too large and too many > partitions, the intermediate results may accumulate in the memory of the > driver process, causing excessive memory pressure. If set to false, merge in > executor process(by guava.QuickSelect), intermediate results will not > accumulate in memory, but will cause longer runtimes. > something like: > _(org.apache.spark.rdd.RDD)_ > {code:scala} > def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { > if (num == 0 || partitions.length == 0) { > Array.empty > } else { > if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) { > val mapRDDs = mapPartitions { items => > // Priority keeps the largest elements, so let's reverse the > ordering. > val queue = new BoundedPriorityQueue[T](num)(ord.reverse) > queue ++= collectionUtils.takeOrdered(items, num)(ord) > Iterator.single(queue) > } > mapRDDs.reduce { (queue1, queue2) => > queue1 ++= queue2 > queue1 > }.toArray.sorted(ord) > } else { > mapPartitions { items => > collectionUtils.takeOrdered(items, num)(ord) > }.repartition(1).mapPartitions { items => > collectionUtils.takeOrdered(items, num)(ord) > }.collect() > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org