[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21212


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21212#discussion_r186315362
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator(
 // at most maxBytesInFlight in order to limit the amount of data in 
flight.
 val remoteRequests = new ArrayBuffer[FetchRequest]
 
-// Tracks total number of blocks (including zero sized blocks)
-var totalBlocks = 0
 for ((address, blockInfos) <- blocksByAddress) {
-  totalBlocks += blockInfos.size
   if (address.executorId == blockManager.blockManagerId.executorId) {
-// Filter out zero-sized blocks
-localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+blockInfos.find(_._2 <= 0) match {
+  case Some((blockId, size)) if size < 0 =>
+throw new BlockException(blockId, "Negative block size " + 
size)
+  case Some((blockId, size)) if size == 0 =>
+throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
--- End diff --

I think that failing with an exception here is a great idea, so thanks for 
adding these checks. In general, I'm in favor of adding explicit fail-fast 
checks for invariants like this because it can help to defend against silent 
corruption bugs. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-06 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21212#discussion_r186293064
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator(
 // at most maxBytesInFlight in order to limit the amount of data in 
flight.
 val remoteRequests = new ArrayBuffer[FetchRequest]
 
-// Tracks total number of blocks (including zero sized blocks)
-var totalBlocks = 0
 for ((address, blockInfos) <- blocksByAddress) {
-  totalBlocks += blockInfos.size
   if (address.executorId == blockManager.blockManagerId.executorId) {
-// Filter out zero-sized blocks
-localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+blockInfos.find(_._2 <= 0) match {
+  case Some((blockId, size)) if size < 0 =>
+throw new BlockException(blockId, "Negative block size " + 
size)
+  case Some((blockId, size)) if size == 0 =>
+throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
--- End diff --

Added another check for remote blocks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-05 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21212#discussion_r186261650
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator(
 // at most maxBytesInFlight in order to limit the amount of data in 
flight.
 val remoteRequests = new ArrayBuffer[FetchRequest]
 
-// Tracks total number of blocks (including zero sized blocks)
-var totalBlocks = 0
 for ((address, blockInfos) <- blocksByAddress) {
-  totalBlocks += blockInfos.size
   if (address.executorId == blockManager.blockManagerId.executorId) {
-// Filter out zero-sized blocks
-localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+blockInfos.find(_._2 <= 0) match {
+  case Some((blockId, size)) if size < 0 =>
+throw new BlockException(blockId, "Negative block size " + 
size)
+  case Some((blockId, size)) if size == 0 =>
+throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
--- End diff --

Is it necessary to throw exception here? If so, shall we also throw 
exception when detect 0-sized **remote** block rather than skip it silently? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21212#discussion_r186254447
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -267,28 +269,28 @@ final class ShuffleBlockFetcherIterator(
 // at most maxBytesInFlight in order to limit the amount of data in 
flight.
 val remoteRequests = new ArrayBuffer[FetchRequest]
 
-// Tracks total number of blocks (including zero sized blocks)
-var totalBlocks = 0
 for ((address, blockInfos) <- blocksByAddress) {
-  totalBlocks += blockInfos.size
   if (address.executorId == blockManager.blockManagerId.executorId) {
-// Filter out zero-sized blocks
-localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+blockInfos.find(_._2 < 0) match {
--- End diff --

Yes, I added check for _._2 ==0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21212#discussion_r185980680
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -267,28 +269,28 @@ final class ShuffleBlockFetcherIterator(
 // at most maxBytesInFlight in order to limit the amount of data in 
flight.
 val remoteRequests = new ArrayBuffer[FetchRequest]
 
-// Tracks total number of blocks (including zero sized blocks)
-var totalBlocks = 0
 for ((address, blockInfos) <- blocksByAddress) {
-  totalBlocks += blockInfos.size
   if (address.executorId == blockManager.blockManagerId.executorId) {
-// Filter out zero-sized blocks
-localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+blockInfos.find(_._2 < 0) match {
--- End diff --

shall we use `_._2 < 0` to make sure the 0-size blocks are filtered?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-01 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/21212

[SPARK-24143] filter empty blocks when convert mapstatus to (blockId,…

… size) pair.

## What changes were proposed in this pull request?

In current code(`MapOutputTracker.convertMapStatuses`), mapstatus are 
converted to (blockId, size) pair for all blocks – no matter the block is 
empty or not, which result in OOM when there are lots of consecutive empty 
blocks, especially when adaptive execution is enabled.

(blockId, size) pair is only used in `ShuffleBlockFetcherIterator` to 
control shuffle-read and only non-empty block request is sent. Can we just 
filter out the empty blocks in MapOutputTracker.convertMapStatuses and save 
memory?


## How was this patch tested?

not added yet.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-24143

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21212


commit 5211ebd5cc4d3de023752b8ab8168d7bda18aa83
Author: jinxing 
Date:   2018-05-02T05:40:34Z

[SPARK-24143] filter empty blocks when convert mapstatus to (blockId, size) 
pair.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org