Chen Zhang created SPARK-32212: ---------------------------------- Summary: RDD.takeOrdered merge intermediate results can be configured in driver or executor Key: SPARK-32212 URL: https://issues.apache.org/jira/browse/SPARK-32212 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Chen Zhang
In the list of issues, I saw some discussions about exceeding the memory limit of the driver when using _RDD.takeOrdered_ or _SQL(order by xx limit xx)_. I think that the implementation of _RDD.takeOrdered_ can be improved. In the original code implementation of _RDD.takeOrdered_, the QuickSelect algorithm in guava is used in the executor process to calculate the local TopK results of each RDD partition. These intermediate results are packaged into java.util.PriorityQueue and returned to the driver process. In the driver process, these intermediate results are merged to get the global TopK results. The problem with this implementation is that if the intermediate results are too large and too many partitions, the intermediate results may accumulate in the memory of the driver process, causing excessive memory pressure. We can use an optional config to determine whether the intermediate results(local TopK) of each partition in _RDD.takeOrdered_ will be merged in driver process or executor process. If set to true, merge in driver process(by util.PriorityQueue), which will get shorter waiting time for return. But if the intermediate results are too large and too many partitions, the intermediate results may accumulate in the memory of the driver process, causing excessive memory pressure. If set to false, merge in executor process(by guava.QuickSelect), intermediate results will not accumulate in memory, but will cause longer runtimes. something like: _(org.apache.spark.rdd.RDD)_ {code:scala} def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0 || partitions.length == 0) { Array.empty } else { if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) { val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= collectionUtils.takeOrdered(items, num)(ord) Iterator.single(queue) } mapRDDs.reduce { (queue1, queue2) => queue1 ++= queue2 queue1 }.toArray.sorted(ord) } else { mapPartitions { items => collectionUtils.takeOrdered(items, num)(ord) }.repartition(1).mapPartitions { items => collectionUtils.takeOrdered(items, num)(ord) }.collect() } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org