[ 
https://issues.apache.org/jira/browse/SPARK-24587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24587.
----------------------------------
    Resolution: Incomplete

> RDD.takeOrdered uses reduce, pulling all partition data to the driver
> ---------------------------------------------------------------------
>
>                 Key: SPARK-24587
>                 URL: https://issues.apache.org/jira/browse/SPARK-24587
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: Ryan Deak
>            Priority: Major
>              Labels: bulk-closed
>
> *NOTE*: _This is likely a *very* impactful change, and likely only matters 
> when {{num}} is large, but without something like the proposed change, 
> algorithms based on distributed {{top-K}} don't scale very well._
> h2. Description
> {{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}}
>  uses 
> {{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}}
>  to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} 
> is the size of the returned {{Array}}.  Consequently, even when the size of 
> the return value is small, relative to the driver memory, errors can occur.
> An example error is:
> {code}
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 
> 28 tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 
> 29 tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> ...
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 
> 160 tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> {code}
> It's clear from this message that although the resulting size of the result 
> will be approximately *0.3 GB*  ({{46.4/160}}), the amount of driver memory 
> required to combine the results is more than {{46 GB}}.
> h2. Proposed Solution
> This amount of memory required can be dramatically reduced by using 
> {{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}.
>   For instance replacing the {{else}} clause with:
> {code:language=scala}
> else {
>   import scala.math.{ceil, log, max}
>   val depth = max(1, ceil(log(mapRDDs.partitions.length) / log(2)).toInt)
>   mapRDDs.treeReduce(
>     (queue1, queue2) => queue1 ++= queue2,
>     depth
>   ).toArray.sorted(ord)
> }
> {code}
> This should require less than double the network communication but should 
> scale to much larger values of the {{num}} parameter without configuration 
> changes or beefier machines.
> h2. Code Potentially Impacted
> * ML Lib's 
> {{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to