otterc commented on a change in pull request #32140: URL: https://github.com/apache/spark/pull/32140#discussion_r640210756
########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -712,38 +824,66 @@ final class ShuffleBlockFetcherIterator( case e: IOException => logError("Failed to create input stream from local block", e) } buf.release() - throwFetchFailedException(blockId, mapIndex, address, e) - } - try { - input = streamWrapper(blockId, in) - // If the stream is compressed or wrapped, then we optionally decompress/unwrap the - // first maxBytesInFlight/3 bytes into memory, to check for corruption in that portion - // of the data. But even if 'detectCorruptUseExtraMemory' configuration is off, or if - // the corruption is later, we'll still detect the corruption later in the stream. - streamCompressedOrEncrypted = !input.eq(in) - if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) { - // TODO: manage the memory used here, and spill it into disk in case of OOM. - input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3) - } - } catch { - case e: IOException => - buf.release() - if (buf.isInstanceOf[FileSegmentManagedBuffer] - || corruptedBlocks.contains(blockId)) { - throwFetchFailedException(blockId, mapIndex, address, e) - } else { - logWarning(s"got an corrupted block $blockId from $address, fetch again", e) - corruptedBlocks += blockId - fetchRequests += FetchRequest( - address, Array(FetchBlockInfo(blockId, size, mapIndex))) + if (blockId.isShuffleChunk) { + numBlocksProcessed += pushBasedFetchHelper + .initiateFallbackBlockFetchForMergedBlock(blockId, address) + // Set result to null to trigger another iteration of the while loop to get either. result = null + null + } else { + throwFetchFailedException(blockId, mapIndex, address, e) + } + } + if (in != null) { + try { + input = streamWrapper(blockId, in) + // If the stream is compressed or wrapped, then we optionally decompress/unwrap the + // first maxBytesInFlight/3 bytes into memory, to check for corruption in that portion + // of the data. But even if 'detectCorruptUseExtraMemory' configuration is off, or if + // the corruption is later, we'll still detect the corruption later in the stream. + streamCompressedOrEncrypted = !input.eq(in) + if (streamCompressedOrEncrypted && detectCorruptUseExtraMemory) { + // TODO: manage the memory used here, and spill it into disk in case of OOM. + input = Utils.copyStreamUpTo(input, maxBytesInFlight / 3) + } + } catch { + case e: IOException => Review comment: Note to self: Most of this is as before. Have added conditions for shuffleChunks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org