This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 72cce5c39da [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size 72cce5c39da is described below commit 72cce5c39da8a52efa0a7cad4d65dac4eb389ff6 Author: gaoyajun02 <gaoyaju...@meituan.com> AuthorDate: Mon Nov 21 00:14:07 2022 -0600 [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size ### What changes were proposed in this pull request? When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks. ### Why are the changes needed? When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes #38333 from gaoyajun02/SPARK-40872. Authored-by: gaoyajun02 <gaoyaju...@meituan.com> Signed-off-by: Mridul <mridul<at>gmail.com> --- .../spark/storage/PushBasedFetchHelper.scala | 2 + .../storage/ShuffleBlockFetcherIterator.scala | 70 +++++++++++++--------- .../storage/ShuffleBlockFetcherIteratorSuite.scala | 13 ++++ 3 files changed, 57 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala index dd81c860ba3..8cc1b865207 100644 --- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala +++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala @@ -285,6 +285,8 @@ private class PushBasedFetchHelper( * 2. There is a failure when fetching remote shuffle chunks. * 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk * (local or remote). + * 4. There is a zero-size buffer when processing SuccessFetchResult for a shuffle chunk + * (local or remote). */ def initiateFallbackFetchForPushMergedBlock( blockId: BlockId, diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index b5f20522e91..e35144756b5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -782,7 +782,7 @@ final class ShuffleBlockFetcherIterator( logDebug("Number of requests in flight " + reqsInFlight) } - if (buf.size == 0) { + val in = if (buf.size == 0) { // We will never legitimately receive a zero-size block. All blocks with zero records // have zero size and all zero-size blocks have no records (and hence should never // have been requested in the first place). This statement relies on behaviors of the @@ -798,38 +798,52 @@ final class ShuffleBlockFetcherIterator( // since the last call. val msg = s"Received a zero-size buffer for block $blockId from $address " + s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)" - throwFetchFailedException(blockId, mapIndex, address, new IOException(msg)) - } - - val in = try { - val bufIn = buf.createInputStream() - if (checksumEnabled) { - val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm) - checkedIn = new CheckedInputStream(bufIn, checksum) - checkedIn + if (blockId.isShuffleChunk) { + // Zero-size block may come from nodes with hardware failures, For shuffle chunks, + // the original shuffle blocks that belong to that zero-size shuffle chunk is + // available and we can opt to fallback immediately. + logWarning(msg) + pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) + // Set result to null to trigger another iteration of the while loop to get either. + result = null + null } else { - bufIn + throwFetchFailedException(blockId, mapIndex, address, new IOException(msg)) } - } catch { - // The exception could only be throwed by local shuffle block - case e: IOException => - assert(buf.isInstanceOf[FileSegmentManagedBuffer]) - e match { - case ce: ClosedByInterruptException => - logError("Failed to create input stream from local block, " + - ce.getMessage) - case e: IOException => logError("Failed to create input stream from local block", e) - } - buf.release() - if (blockId.isShuffleChunk) { - pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) - // Set result to null to trigger another iteration of the while loop to get either. - result = null - null + } else { + try { + val bufIn = buf.createInputStream() + if (checksumEnabled) { + val checksum = ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm) + checkedIn = new CheckedInputStream(bufIn, checksum) + checkedIn } else { - throwFetchFailedException(blockId, mapIndex, address, e) + bufIn } + } catch { + // The exception could only be throwed by local shuffle block + case e: IOException => + assert(buf.isInstanceOf[FileSegmentManagedBuffer]) + e match { + case ce: ClosedByInterruptException => + logError("Failed to create input stream from local block, " + + ce.getMessage) + case e: IOException => + logError("Failed to create input stream from local block", e) + } + buf.release() + if (blockId.isShuffleChunk) { + pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address) + // Set result to null to trigger another iteration of the while loop to get + // either. + result = null + null + } else { + throwFetchFailedException(blockId, mapIndex, address, e) + } + } } + if (in != null) { try { input = streamWrapper(blockId, in) diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index f8fe28c0512..64b6c93bf52 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -1814,4 +1814,17 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("SPARK-40872: fallback to original shuffle block when a push-merged shuffle chunk " + + "is zero-size") { + val blockManager = mock(classOf[BlockManager]) + val localDirs = Array("local-dir") + val blocksByAddress = prepareForFallbackToLocalBlocks( + blockManager, Map(SHUFFLE_MERGER_IDENTIFIER -> localDirs)) + val zeroSizeBuffer = createMockManagedBuffer(0) + doReturn(Seq({zeroSizeBuffer})).when(blockManager) + .getLocalMergedBlockData(ShuffleMergedBlockId(0, 0, 2), localDirs) + val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress, + blockManager = Some(blockManager), streamWrapperLimitSize = Some(100)) + verifyLocalBlocksFromFallback(iterator) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org