[ https://issues.apache.org/jira/browse/SPARK-24587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ryan Deak updated SPARK-24587: ------------------------------ Description: *NOTE*: _This is likely a *very* impactful change, and likely only matters when {{num}} is large, but without something like the proposed change, algorithms based on distributed {{top-K}} don't scale very well._ h2. Description {{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}} uses {{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}} to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} is the size of the returned {{Array}}. Consequently, even when the size of the return value is small, relative to the driver memory, errors can occur. An example error is: {code} 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 28 tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB) 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 29 tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB) ... 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 160 tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB) {code} It's clear from this message that although the resulting size of the result will be approximately *0.3 GB* ({{46.4/160}}), the amount of driver memory required to combine the results is more than {{46 GB}}. h2. Proposed Solution This amount of memory required can be dramatically reduced by using {{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}. For instance replacing the {{else}} clause with: {code:language=scala} else { import scala.math.{ceil, log, max} val depth = max(1, ceil(log(mapRDDs.partitions.length) / log(2)).toInt) mapRDDs.treeReduce( (queue1, queue2) => queue1 ++= queue2, depth ).toArray.sorted(ord) } {code} This should require less than double the network communication but should scale to much larger values of the {{num}} parameter without configuration changes or beefier machines. h2. Code Potentially Impacted * ML Lib's {{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}} was: *NOTE*: _This is likely a *very* impactful change, and likely only matters when {{num}} is large, but without something like the proposed change, algorithms based on distributed {{top-K}} don't scale very well._ h2. Description {{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}} uses {{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}} to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} is the size of the returned {{Array}}. Consequently, even when the size of the return value is small, relative to the driver memory, errors can occur. An example error is: {code} 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 28 tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB) 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 29 tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB) ... 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 160 tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB) {code} It's clear from this message that although the resulting size of the result will be approximately *0.3 GB* ({{46.4/160}}), the amount of driver memory required to combine the results is more than {{46 GB}}. h2. Proposed Solution This amount of memory required can be dramatically reduced by using {{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}. For instance replacing the {{else}} clause with: {code:language=scala} else { import scala.math.{ceil, log, max} val depth = max(2, ceil(log(mapRDDs.partitions.length) / log(2)).toInt) mapRDDs.treeReduce( (queue1, queue2) => queue1 ++= queue2, depth ).toArray.sorted(ord) } {code} This should require less than double the network communication but should scale to much larger values of the {{num}} parameter without configuration changes or beefier machines. h2. Code Potentially Impacted * ML Lib's {{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}} > RDD.takeOrdered uses reduce, pulling all partition data to the driver > --------------------------------------------------------------------- > > Key: SPARK-24587 > URL: https://issues.apache.org/jira/browse/SPARK-24587 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 2.3.1 > Reporter: Ryan Deak > Priority: Major > > *NOTE*: _This is likely a *very* impactful change, and likely only matters > when {{num}} is large, but without something like the proposed change, > algorithms based on distributed {{top-K}} don't scale very well._ > h2. Description > {{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}} > uses > {{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}} > to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} > is the size of the returned {{Array}}. Consequently, even when the size of > the return value is small, relative to the driver memory, errors can occur. > An example error is: > {code} > 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of > 28 tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB) > 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of > 29 tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB) > ... > 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of > 160 tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB) > {code} > It's clear from this message that although the resulting size of the result > will be approximately *0.3 GB* ({{46.4/160}}), the amount of driver memory > required to combine the results is more than {{46 GB}}. > h2. Proposed Solution > This amount of memory required can be dramatically reduced by using > {{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}. > For instance replacing the {{else}} clause with: > {code:language=scala} > else { > import scala.math.{ceil, log, max} > val depth = max(1, ceil(log(mapRDDs.partitions.length) / log(2)).toInt) > mapRDDs.treeReduce( > (queue1, queue2) => queue1 ++= queue2, > depth > ).toArray.sorted(ord) > } > {code} > This should require less than double the network communication but should > scale to much larger values of the {{num}} parameter without configuration > changes or beefier machines. > h2. Code Potentially Impacted > * ML Lib's > {{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org