otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r640210756



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -712,38 +824,66 @@ final class ShuffleBlockFetcherIterator(
                 case e: IOException => logError("Failed to create input stream 
from local block", e)
               }
               buf.release()
-              throwFetchFailedException(blockId, mapIndex, address, e)
-          }
-          try {
-            input = streamWrapper(blockId, in)
-            // If the stream is compressed or wrapped, then we optionally 
decompress/unwrap the
-            // first maxBytesInFlight/3 bytes into memory, to check for 
corruption in that portion
-            // of the data. But even if 'detectCorruptUseExtraMemory' 
configuration is off, or if
-            // the corruption is later, we'll still detect the corruption 
later in the stream.
-            streamCompressedOrEncrypted = !input.eq(in)
-            if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) {
-              // TODO: manage the memory used here, and spill it into disk in 
case of OOM.
-              input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3)
-            }
-          } catch {
-            case e: IOException =>
-              buf.release()
-              if (buf.isInstanceOf[FileSegmentManagedBuffer]
-                  || corruptedBlocks.contains(blockId)) {
-                throwFetchFailedException(blockId, mapIndex, address, e)
-              } else {
-                logWarning(s"got an corrupted block $blockId from $address, 
fetch again", e)
-                corruptedBlocks += blockId
-                fetchRequests += FetchRequest(
-                  address, Array(FetchBlockInfo(blockId, size, mapIndex)))
+              if (blockId.isShuffleChunk) {
+                numBlocksProcessed += pushBasedFetchHelper
+                  .initiateFallbackBlockFetchForMergedBlock(blockId, address)
+                // Set result to null to trigger another iteration of the 
while loop to get either.
                 result = null
+                null
+              } else {
+                throwFetchFailedException(blockId, mapIndex, address, e)
+              }
+          }
+          if (in != null) {
+            try {
+              input = streamWrapper(blockId, in)
+              // If the stream is compressed or wrapped, then we optionally 
decompress/unwrap the
+              // first maxBytesInFlight/3 bytes into memory, to check for 
corruption in that portion
+              // of the data. But even if 'detectCorruptUseExtraMemory' 
configuration is off, or if
+              // the corruption is later, we'll still detect the corruption 
later in the stream.
+              streamCompressedOrEncrypted = !input.eq(in)
+              if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) {
+                // TODO: manage the memory used here, and spill it into disk 
in case of OOM.
+                input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3)
+              }
+            } catch {
+              case e: IOException =>

Review comment:
       Note to self: Most of this is as before. Have added conditions for 
shuffleChunks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to