Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22325#discussion_r218944816 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -444,36 +444,34 @@ final class ShuffleBlockFetcherIterator( throwFetchFailedException(blockId, address, e) } - input = streamWrapper(blockId, in) - // Only copy the stream if it's wrapped by compression or encryption, also the size of - // block is small (the decompressed block is smaller than maxBytesInFlight) - if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { - val originalInput = input - val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) - try { + try { + input = streamWrapper(blockId, in) + // Only copy the stream if it's wrapped by compression or encryption, also the size of + // block is small (the decompressed block is smaller than maxBytesInFlight) + if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { + val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) // Decompress the whole block at once to detect any corruption, which could increase // the memory usage tne potential increase the chance of OOM. // TODO: manage the memory used here, and spill it into disk in case of OOM. Utils.copyStream(input, out) out.close() input = out.toChunkedByteBuffer.toInputStream(dispose = true) --- End diff -- I just added `originalInput` val and changed its scope to be able to close it in finally section
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org