Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22325#discussion_r215396772 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -444,10 +444,23 @@ final class ShuffleBlockFetcherIterator( throwFetchFailedException(blockId, address, e) } - input = streamWrapper(blockId, in) + var wrapCorruption: Boolean = false + try { + input = streamWrapper(blockId, in) + } catch { + case e: IOException => + buf.release() + logWarning(s"got a corrupted block $blockId from $address while wrapping it" + + s" locally, fetch again", e) + corruptedBlocks += blockId + fetchRequests += FetchRequest(address, Array((blockId, size))) + wrapCorruption = true + result = null + in.close + } // Only copy the stream if it's wrapped by compression or encryption, also the size of // block is small (the decompressed block is smaller than maxBytesInFlight) - if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { + if (detectCorrupt && !wrapCorruption && !input.eq(in) && size < maxBytesInFlight / 3) { val originalInput = input --- End diff -- @davies would appreciate your comments about this change as well. Also I have a qq. What is the use of originalInput? it seems that this var isn'e used any where except in the finally clause. I think it can be removed.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org