This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 72cce5c39da [SPARK-40872] Fallback to original shuffle block when a 
push-merged shuffle chunk is zero-size
72cce5c39da is described below

commit 72cce5c39da8a52efa0a7cad4d65dac4eb389ff6
Author: gaoyajun02 <gaoyaju...@meituan.com>
AuthorDate: Mon Nov 21 00:14:07 2022 -0600

    [SPARK-40872] Fallback to original shuffle block when a push-merged shuffle 
chunk is zero-size
    
    ### What changes were proposed in this pull request?
    When push-based shuffle is enabled, a zero-size buf error may occur when 
fetching shuffle chunks from bad nodes, especially when memory is full. In this 
case, we can fall back to original shuffle blocks.
    
    ### Why are the changes needed?
    When the reduce task obtains the shuffle chunk with a zero-size buf, we let 
it fall back to original shuffle block. After verification, these blocks can be 
read successfully without shuffle retry.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
    
    Closes #38333 from gaoyajun02/SPARK-40872.
    
    Authored-by: gaoyajun02 <gaoyaju...@meituan.com>
    Signed-off-by: Mridul <mridul<at>gmail.com>
---
 .../spark/storage/PushBasedFetchHelper.scala       |  2 +
 .../storage/ShuffleBlockFetcherIterator.scala      | 70 +++++++++++++---------
 .../storage/ShuffleBlockFetcherIteratorSuite.scala | 13 ++++
 3 files changed, 57 insertions(+), 28 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala 
b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
index dd81c860ba3..8cc1b865207 100644
--- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
+++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
@@ -285,6 +285,8 @@ private class PushBasedFetchHelper(
    * 2. There is a failure when fetching remote shuffle chunks.
    * 3. There is a failure when processing SuccessFetchResult which is for a 
shuffle chunk
    *    (local or remote).
+   * 4. There is a zero-size buffer when processing SuccessFetchResult for a 
shuffle chunk
+   *    (local or remote).
    */
   def initiateFallbackFetchForPushMergedBlock(
       blockId: BlockId,
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b5f20522e91..e35144756b5 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -782,7 +782,7 @@ final class ShuffleBlockFetcherIterator(
             logDebug("Number of requests in flight " + reqsInFlight)
           }
 
-          if (buf.size == 0) {
+          val in = if (buf.size == 0) {
             // We will never legitimately receive a zero-size block. All 
blocks with zero records
             // have zero size and all zero-size blocks have no records (and 
hence should never
             // have been requested in the first place). This statement relies 
on behaviors of the
@@ -798,38 +798,52 @@ final class ShuffleBlockFetcherIterator(
             // since the last call.
             val msg = s"Received a zero-size buffer for block $blockId from 
$address " +
               s"(expectedApproxSize = $size, 
isNetworkReqDone=$isNetworkReqDone)"
-            throwFetchFailedException(blockId, mapIndex, address, new 
IOException(msg))
-          }
-
-          val in = try {
-            val bufIn = buf.createInputStream()
-            if (checksumEnabled) {
-              val checksum = 
ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm)
-              checkedIn = new CheckedInputStream(bufIn, checksum)
-              checkedIn
+            if (blockId.isShuffleChunk) {
+              // Zero-size block may come from nodes with hardware failures, 
For shuffle chunks,
+              // the original shuffle blocks that belong to that zero-size 
shuffle chunk is
+              // available and we can opt to fallback immediately.
+              logWarning(msg)
+              
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+              // Set result to null to trigger another iteration of the while 
loop to get either.
+              result = null
+              null
             } else {
-              bufIn
+              throwFetchFailedException(blockId, mapIndex, address, new 
IOException(msg))
             }
-          } catch {
-            // The exception could only be throwed by local shuffle block
-            case e: IOException =>
-              assert(buf.isInstanceOf[FileSegmentManagedBuffer])
-              e match {
-                case ce: ClosedByInterruptException =>
-                  logError("Failed to create input stream from local block, " +
-                    ce.getMessage)
-                case e: IOException => logError("Failed to create input stream 
from local block", e)
-              }
-              buf.release()
-              if (blockId.isShuffleChunk) {
-                
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
-                // Set result to null to trigger another iteration of the 
while loop to get either.
-                result = null
-                null
+          } else {
+            try {
+              val bufIn = buf.createInputStream()
+              if (checksumEnabled) {
+                val checksum = 
ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm)
+                checkedIn = new CheckedInputStream(bufIn, checksum)
+                checkedIn
               } else {
-                throwFetchFailedException(blockId, mapIndex, address, e)
+                bufIn
               }
+            } catch {
+              // The exception could only be throwed by local shuffle block
+              case e: IOException =>
+                assert(buf.isInstanceOf[FileSegmentManagedBuffer])
+                e match {
+                  case ce: ClosedByInterruptException =>
+                    logError("Failed to create input stream from local block, 
" +
+                      ce.getMessage)
+                  case e: IOException =>
+                    logError("Failed to create input stream from local block", 
e)
+                }
+                buf.release()
+                if (blockId.isShuffleChunk) {
+                  
pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(blockId, address)
+                  // Set result to null to trigger another iteration of the 
while loop to get
+                  // either.
+                  result = null
+                  null
+                } else {
+                  throwFetchFailedException(blockId, mapIndex, address, e)
+                }
+            }
           }
+
           if (in != null) {
             try {
               input = streamWrapper(blockId, in)
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index f8fe28c0512..64b6c93bf52 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1814,4 +1814,17 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("SPARK-40872: fallback to original shuffle block when a push-merged 
shuffle chunk " +
+    "is zero-size") {
+    val blockManager = mock(classOf[BlockManager])
+    val localDirs = Array("local-dir")
+    val blocksByAddress = prepareForFallbackToLocalBlocks(
+      blockManager, Map(SHUFFLE_MERGER_IDENTIFIER -> localDirs))
+    val zeroSizeBuffer = createMockManagedBuffer(0)
+    doReturn(Seq({zeroSizeBuffer})).when(blockManager)
+      .getLocalMergedBlockData(ShuffleMergedBlockId(0, 0, 2), localDirs)
+    val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress,
+      blockManager = Some(blockManager), streamWrapperLimitSize = Some(100))
+    verifyLocalBlocksFromFallback(iterator)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to