spark git commit: [SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold.
Repository: spark Updated Branches: refs/heads/branch-2.2 c4b16dcca -> 81f63c892 [SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold. ## What changes were proposed in this pull request? Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is used to store size of blocks. in HighlyCompressedMapStatus, only average size is stored for non empty blocks. Which is not good for memory control when we shuffle blocks. It makes sense to store the accurate size of block when it's above threshold. ## How was this patch tested? Added test in MapStatusSuite. Author: jinxingCloses #18031 from jinxing64/SPARK-20801. (cherry picked from commit 2597674bcc295c2e29c4cfc4a9a48938bd63bf9c) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81f63c89 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81f63c89 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81f63c89 Branch: refs/heads/branch-2.2 Commit: 81f63c8923416014d5c6bc227dd3c4e2a62bac8e Parents: c4b16dc Author: jinxing Authored: Mon May 22 22:09:49 2017 +0800 Committer: Wenchen Fan Committed: Mon May 22 22:10:06 2017 +0800 -- .../apache/spark/internal/config/package.scala | 9 .../org/apache/spark/scheduler/MapStatus.scala | 54 .../apache/spark/scheduler/MapStatusSuite.scala | 28 +- docs/configuration.md | 9 4 files changed, 90 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81f63c89/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7f7921d..e193ed2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -278,4 +278,13 @@ package object config { "spark.io.compression.codec.") .booleanConf .createWithDefault(false) + + private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD = +ConfigBuilder("spark.shuffle.accurateBlockThreshold") + .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " + +"record the size accurately if it's above this config. This helps to prevent OOM by " + +"avoiding underestimating shuffle block size when fetch shuffle blocks.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(100 * 1024 * 1024) + } http://git-wip-us.apache.org/repos/asf/spark/blob/81f63c89/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index b2e9a97..048e0d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,8 +19,13 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import org.roaringbitmap.RoaringBitmap +import org.apache.spark.SparkEnv +import org.apache.spark.internal.config import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -121,34 +126,41 @@ private[spark] class CompressedMapStatus( } /** - * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, + * A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger + * than spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty blocks, * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty - * @param avgSize average size of the non-empty blocks + * @param avgSize average size of the non-empty and non-huge blocks + * @param hugeBlockSizes sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, -private[this] var avgSize: Long) +private[this] var avgSize: Long, +@transient private var hugeBlockSizes: Map[Int, Byte])
spark git commit: [SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold.
Repository: spark Updated Branches: refs/heads/master aea73be1b -> 2597674bc [SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold. ## What changes were proposed in this pull request? Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is used to store size of blocks. in HighlyCompressedMapStatus, only average size is stored for non empty blocks. Which is not good for memory control when we shuffle blocks. It makes sense to store the accurate size of block when it's above threshold. ## How was this patch tested? Added test in MapStatusSuite. Author: jinxingCloses #18031 from jinxing64/SPARK-20801. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2597674b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2597674b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2597674b Branch: refs/heads/master Commit: 2597674bcc295c2e29c4cfc4a9a48938bd63bf9c Parents: aea73be Author: jinxing Authored: Mon May 22 22:09:49 2017 +0800 Committer: Wenchen Fan Committed: Mon May 22 22:09:49 2017 +0800 -- .../apache/spark/internal/config/package.scala | 9 .../org/apache/spark/scheduler/MapStatus.scala | 54 .../apache/spark/scheduler/MapStatusSuite.scala | 28 +- docs/configuration.md | 9 4 files changed, 90 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2597674b/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7f7921d..e193ed2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -278,4 +278,13 @@ package object config { "spark.io.compression.codec.") .booleanConf .createWithDefault(false) + + private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD = +ConfigBuilder("spark.shuffle.accurateBlockThreshold") + .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " + +"record the size accurately if it's above this config. This helps to prevent OOM by " + +"avoiding underestimating shuffle block size when fetch shuffle blocks.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(100 * 1024 * 1024) + } http://git-wip-us.apache.org/repos/asf/spark/blob/2597674b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index b2e9a97..048e0d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,8 +19,13 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import org.roaringbitmap.RoaringBitmap +import org.apache.spark.SparkEnv +import org.apache.spark.internal.config import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -121,34 +126,41 @@ private[spark] class CompressedMapStatus( } /** - * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, + * A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger + * than spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty blocks, * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty - * @param avgSize average size of the non-empty blocks + * @param avgSize average size of the non-empty and non-huge blocks + * @param hugeBlockSizes sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, -private[this] var avgSize: Long) +private[this] var avgSize: Long, +@transient private var hugeBlockSizes: Map[Int, Byte]) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization -