Chen Zhang created SPARK-32212:
----------------------------------

             Summary: RDD.takeOrdered merge intermediate results can be 
configured in driver or executor
                 Key: SPARK-32212
                 URL: https://issues.apache.org/jira/browse/SPARK-32212
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 3.0.0
            Reporter: Chen Zhang


In the list of issues, I saw some discussions about exceeding the memory limit 
of the driver when using _RDD.takeOrdered_ or _SQL(order by xx limit xx)_. I 
think that the implementation of _RDD.takeOrdered_ can be improved.

In the original code implementation of _RDD.takeOrdered_, the QuickSelect 
algorithm in guava is used in the executor process to calculate the local TopK 
results of each RDD partition. These intermediate results are packaged into 
java.util.PriorityQueue and returned to the driver process. In the driver 
process, these intermediate results are merged to get the global TopK results.

The problem with this implementation is that if the intermediate results are 
too large and too many partitions, the intermediate results may accumulate in 
the memory of the driver process, causing excessive memory pressure.

We can use an optional config to determine whether the intermediate 
results(local TopK) of each partition in _RDD.takeOrdered_ will be merged in 
driver process or executor process. If set to true, merge in driver process(by 
util.PriorityQueue), which will get shorter waiting time for return. But if the 
intermediate results are too large and too many partitions, the intermediate 
results may accumulate in the memory of the driver process, causing excessive 
memory pressure. If set to false, merge in executor process(by 
guava.QuickSelect), intermediate results will not accumulate in memory, but 
will cause longer runtimes.

something like:
_(org.apache.spark.rdd.RDD)_
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0 || partitions.length == 0) {
      Array.empty
    } else {
      if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
        val mapRDDs = mapPartitions { items =>
          // Priority keeps the largest elements, so let's reverse the ordering.
          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
          queue ++= collectionUtils.takeOrdered(items, num)(ord)
          Iterator.single(queue)
        }
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      } else {
        mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.repartition(1).mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.collect()
      }
    }
  }
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to