spark git commit: [SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold.

2017-05-22 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c4b16dcca -> 81f63c892


[SPARK-20801] Record accurate size of blocks in MapStatus when it's above 
threshold.

## What changes were proposed in this pull request?

Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is 
used to store size of blocks. in HighlyCompressedMapStatus, only average size 
is stored for non empty blocks. Which is not good for memory control when we 
shuffle blocks. It makes sense to store the accurate size of block when it's 
above threshold.

## How was this patch tested?

Added test in MapStatusSuite.

Author: jinxing 

Closes #18031 from jinxing64/SPARK-20801.

(cherry picked from commit 2597674bcc295c2e29c4cfc4a9a48938bd63bf9c)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81f63c89
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81f63c89
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81f63c89

Branch: refs/heads/branch-2.2
Commit: 81f63c8923416014d5c6bc227dd3c4e2a62bac8e
Parents: c4b16dc
Author: jinxing 
Authored: Mon May 22 22:09:49 2017 +0800
Committer: Wenchen Fan 
Committed: Mon May 22 22:10:06 2017 +0800

--
 .../apache/spark/internal/config/package.scala  |  9 
 .../org/apache/spark/scheduler/MapStatus.scala  | 54 
 .../apache/spark/scheduler/MapStatusSuite.scala | 28 +-
 docs/configuration.md   |  9 
 4 files changed, 90 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81f63c89/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7f7921d..e193ed2 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -278,4 +278,13 @@ package object config {
 "spark.io.compression.codec.")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
+ConfigBuilder("spark.shuffle.accurateBlockThreshold")
+  .doc("When we compress the size of shuffle blocks in 
HighlyCompressedMapStatus, we will " +
+"record the size accurately if it's above this config. This helps to 
prevent OOM by " +
+"avoiding underestimating shuffle block size when fetch shuffle 
blocks.")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefault(100 * 1024 * 1024)
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/81f63c89/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index b2e9a97..048e0d0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,13 @@ package org.apache.spark.scheduler
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
 import org.roaringbitmap.RoaringBitmap
 
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.config
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.Utils
 
@@ -121,34 +126,41 @@ private[spark] class CompressedMapStatus(
 }
 
 /**
- * A [[MapStatus]] implementation that only stores the average size of 
non-empty blocks,
+ * A [[MapStatus]] implementation that stores the accurate size of huge 
blocks, which are larger
+ * than spark.shuffle.accurateBlockThreshold. It stores the average size of 
other non-empty blocks,
  * plus a bitmap for tracking which blocks are empty.
  *
  * @param loc location where the task is being executed
  * @param numNonEmptyBlocks the number of non-empty blocks
  * @param emptyBlocks a bitmap tracking which blocks are empty
- * @param avgSize average size of the non-empty blocks
+ * @param avgSize average size of the non-empty and non-huge blocks
+ * @param hugeBlockSizes sizes of huge blocks by their reduceId.
  */
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
 private[this] var emptyBlocks: RoaringBitmap,
-private[this] var avgSize: Long)
+private[this] var avgSize: Long,
+@transient private var hugeBlockSizes: Map[Int, Byte])
   

spark git commit: [SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold.

2017-05-22 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master aea73be1b -> 2597674bc


[SPARK-20801] Record accurate size of blocks in MapStatus when it's above 
threshold.

## What changes were proposed in this pull request?

Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is 
used to store size of blocks. in HighlyCompressedMapStatus, only average size 
is stored for non empty blocks. Which is not good for memory control when we 
shuffle blocks. It makes sense to store the accurate size of block when it's 
above threshold.

## How was this patch tested?

Added test in MapStatusSuite.

Author: jinxing 

Closes #18031 from jinxing64/SPARK-20801.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2597674b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2597674b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2597674b

Branch: refs/heads/master
Commit: 2597674bcc295c2e29c4cfc4a9a48938bd63bf9c
Parents: aea73be
Author: jinxing 
Authored: Mon May 22 22:09:49 2017 +0800
Committer: Wenchen Fan 
Committed: Mon May 22 22:09:49 2017 +0800

--
 .../apache/spark/internal/config/package.scala  |  9 
 .../org/apache/spark/scheduler/MapStatus.scala  | 54 
 .../apache/spark/scheduler/MapStatusSuite.scala | 28 +-
 docs/configuration.md   |  9 
 4 files changed, 90 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2597674b/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7f7921d..e193ed2 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -278,4 +278,13 @@ package object config {
 "spark.io.compression.codec.")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
+ConfigBuilder("spark.shuffle.accurateBlockThreshold")
+  .doc("When we compress the size of shuffle blocks in 
HighlyCompressedMapStatus, we will " +
+"record the size accurately if it's above this config. This helps to 
prevent OOM by " +
+"avoiding underestimating shuffle block size when fetch shuffle 
blocks.")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefault(100 * 1024 * 1024)
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2597674b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index b2e9a97..048e0d0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,13 @@ package org.apache.spark.scheduler
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
 import org.roaringbitmap.RoaringBitmap
 
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.config
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.Utils
 
@@ -121,34 +126,41 @@ private[spark] class CompressedMapStatus(
 }
 
 /**
- * A [[MapStatus]] implementation that only stores the average size of 
non-empty blocks,
+ * A [[MapStatus]] implementation that stores the accurate size of huge 
blocks, which are larger
+ * than spark.shuffle.accurateBlockThreshold. It stores the average size of 
other non-empty blocks,
  * plus a bitmap for tracking which blocks are empty.
  *
  * @param loc location where the task is being executed
  * @param numNonEmptyBlocks the number of non-empty blocks
  * @param emptyBlocks a bitmap tracking which blocks are empty
- * @param avgSize average size of the non-empty blocks
+ * @param avgSize average size of the non-empty and non-huge blocks
+ * @param hugeBlockSizes sizes of huge blocks by their reduceId.
  */
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
 private[this] var emptyBlocks: RoaringBitmap,
-private[this] var avgSize: Long)
+private[this] var avgSize: Long,
+@transient private var hugeBlockSizes: Map[Int, Byte])
   extends MapStatus with Externalizable {
 
   // loc could be null when the default constructor is called during 
deserialization
-