[GitHub] spark pull request #21219: [SPARK-24160] ShuffleBlockFetcherIterator should ...

2018-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21219


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21219: [SPARK-24160] ShuffleBlockFetcherIterator should ...

2018-05-03 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21219#discussion_r185949405
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -407,6 +407,25 @@ final class ShuffleBlockFetcherIterator(
 logDebug("Number of requests in flight " + reqsInFlight)
   }
 
+  if (buf.size == 0) {
+// We will never legitimately receive a zero-size block. All 
blocks with zero records
+// have zero size and all zero-size blocks have no records 
(and hence should never
+// have been requested in the first place). This statement 
relies on behaviors of the
+// shuffle writers, which are guaranteed by the following test 
cases:
+//
+// - BypassMergeSortShuffleWriterSuite: "write with some empty 
partitions"
+// - UnsafeShuffleWriterSuite: "writeEmptyIterator"
+// - DiskBlockObjectWriterSuite: "commit() and close() without 
ever opening or writing"
+//
+// There is not an explicit test for SortShuffleWriter but the 
underlying APIs that
+// uses are shared by the UnsafeShuffleWriter (both writers 
use DiskBlockObjectWriter
+// which returns a zero-size from commitAndGet() in case the 
no records were written
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21219: [SPARK-24160] ShuffleBlockFetcherIterator should ...

2018-05-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21219#discussion_r185788654
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -407,6 +407,25 @@ final class ShuffleBlockFetcherIterator(
 logDebug("Number of requests in flight " + reqsInFlight)
   }
 
+  if (buf.size == 0) {
+// We will never legitimately receive a zero-size block. All 
blocks with zero records
+// have zero size and all zero-size blocks have no records 
(and hence should never
+// have been requested in the first place). This statement 
relies on behaviors of the
+// shuffle writers, which are guaranteed by the following test 
cases:
+//
+// - BypassMergeSortShuffleWriterSuite: "write with some empty 
partitions"
+// - UnsafeShuffleWriterSuite: "writeEmptyIterator"
+// - DiskBlockObjectWriterSuite: "commit() and close() without 
ever opening or writing"
+//
+// There is not an explicit test for SortShuffleWriter but the 
underlying APIs that
+// uses are shared by the UnsafeShuffleWriter (both writers 
use DiskBlockObjectWriter
+// which returns a zero-size from commitAndGet() in case the 
no records were written
--- End diff --

Seems a typo `the no` btw.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21219: [SPARK-24160] ShuffleBlockFetcherIterator should ...

2018-05-02 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/21219

[SPARK-24160] ShuffleBlockFetcherIterator should fail if it receives 
zero-size blocks

## What changes were proposed in this pull request?

This patch modifies `ShuffleBlockFetcherIterator` so that the receipt of 
zero-size blocks is treated as an error. This is done as a preventative measure 
to guard against a potential source of data loss bugs.

In the shuffle layer, we guarantee that zero-size blocks will never be 
requested (a block containing zero records is always 0 bytes in size and is 
marked as empty such that it will never be legitimately requested by 
executors). However, the existing code does not fully take advantage of this 
invariant in the shuffle-read path: the existing code did not explicitly check 
whether blocks are non-zero-size.

Additionally, our decompression and deserialization streams treat zero-size 
inputs as empty streams rather than errors (EOF might actually be treated as 
"end-of-stream" in certain layers (longstanding behavior dating to earliest 
versions of Spark) and decompressors like Snappy may be tolerant to zero-size 
inputs).

As a result, if some other bug causes legitimate buffers to be replaced 
with zero-sized buffers (due to corruption on either the send or receive sides) 
then this would translate into silent data loss rather than an explicit 
fail-fast error. 

This patch addresses this problem by adding a `buf.size != 0` check. See 
code comments for pointers to tests which guarantee the invariants relied on 
here.

## How was this patch tested?

Existing tests (which required modifications, since some were creating 
empty buffers in mocks). I also added a test to make sure we fail on zero-size 
blocks.

To test that the zero-size blocks are indeed a potential corruption source, 
I manually ran a workload in `spark-shell` with a modified build which replaces 
all buffers with zero-size buffers in the receive path.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark SPARK-24160

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21219.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21219


commit 41d06e13d0f95f1dd146b6b512a0becc88eb2caa
Author: Josh Rosen 
Date:   2018-05-02T21:59:26Z

ShuffleBlockFetcherIterator should fail if it receives zero-size blocks




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org