Repository: spark Updated Branches: refs/heads/master 7ff8c45d7 -> ba5bcadde
SPARK-3211 .take() is OOM-prone with empty partitions Instead of jumping straight from 1 partition to all partitions, do exponential growth and double the number of partitions to attempt each time instead. Fix proposed by Paul Nepywoda Author: Andrew Ash <and...@andrewash.com> Closes #2117 from ash211/SPARK-3211 and squashes the following commits: 8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing 09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well 3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba5bcadd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba5bcadd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba5bcadd Branch: refs/heads/master Commit: ba5bcaddecd54811d45c5fc79a013b3857d4c633 Parents: 7ff8c45 Author: Andrew Ash <and...@andrewash.com> Authored: Fri Sep 5 18:52:05 2014 -0700 Committer: Matei Zaharia <ma...@databricks.com> Committed: Fri Sep 5 18:52:05 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 +++---- python/pyspark/rdd.py | 8 ++++---- 2 files changed, 7 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ba5bcadd/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index af9e31b..1cf55e8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag]( // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1 if (partsScanned > 0) { - // If we didn't find any rows after the first iteration, just try all partitions next. - // Otherwise, interpolate the number of partitions we need to try, but overestimate it - // by 50%. + // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise, + // interpolate the number of partitions we need to try, but overestimate it by 50%. if (buf.size == 0) { - numPartsToTry = totalParts - 1 + numPartsToTry = partsScanned * 4 } else { numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt } http://git-wip-us.apache.org/repos/asf/spark/blob/ba5bcadd/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index dff6fc2..04f1352 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1089,11 +1089,11 @@ class RDD(object): # we actually cap it at totalParts in runJob. numPartsToTry = 1 if partsScanned > 0: - # If we didn't find any rows after the first iteration, just - # try all partitions next. Otherwise, interpolate the number - # of partitions we need to try, but overestimate it by 50%. + # If we didn't find any rows after the previous iteration, + # quadruple and retry. Otherwise, interpolate the number of + # partitions we need to try, but overestimate it by 50%. if len(items) == 0: - numPartsToTry = totalParts - 1 + numPartsToTry = partsScanned * 4 else: numPartsToTry = int(1.5 * num * partsScanned / len(items)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org