Github user rezasafi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22325#discussion_r215396772
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
    @@ -444,10 +444,23 @@ final class ShuffleBlockFetcherIterator(
                   throwFetchFailedException(blockId, address, e)
               }
     
    -          input = streamWrapper(blockId, in)
    +          var wrapCorruption: Boolean = false
    +          try {
    +            input = streamWrapper(blockId, in)
    +          } catch {
    +            case e: IOException =>
    +              buf.release()
    +              logWarning(s"got a corrupted block $blockId from $address 
while wrapping it" +
    +                s" locally, fetch again", e)
    +              corruptedBlocks += blockId
    +              fetchRequests += FetchRequest(address, Array((blockId, 
size)))
    +              wrapCorruption = true
    +              result = null
    +              in.close
    +          }
               // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
               // block is small (the decompressed block is smaller than 
maxBytesInFlight)
    -          if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 
3) {
    +          if (detectCorrupt && !wrapCorruption && !input.eq(in) && size < 
maxBytesInFlight / 3) {
                 val originalInput = input
    --- End diff --
    
    @davies would appreciate your comments about this change as well. Also I 
have a qq. What is the use of originalInput? it seems that this var isn'e used 
any where except in the finally clause. I think it can be removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to