[ 
https://issues.apache.org/jira/browse/SPARK-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15251201#comment-15251201
 ] 

Zhongshuai Pei commented on SPARK-4105:
---------------------------------------

I find that some task recompute before this problem happened,and I  think that 
retry operation Corrupted shuffle file that caused this problem. I debug the 
code and corrupted the shuffle file before it has been readed, this problem 
happened every time.maybe we can regenerate the shuffle file when it is 
corrupted

code like this 

BlockStoreShuffleReader.scala
```
     val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
-      serializerManager.wrapForCompression(blockId, inputStream)
+      try {
+        serializerManager.wrapForCompression(blockId, inputStream)  
+      } catch {
+        case e: IOException => {
+          if ((e.getMessage.contains("FAILED_TO_UNCOMPRESS(5)") ||
+              e.getMessage.contains("PARSING_ERROR(2)") ||
+              e.getMessage.contains("Stream is corrupted")) && 
blockId.isShuffle) {
+            val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+            throw new FetchFailedException(
+              blockManager.blockManagerId, shuffleBlockId.shuffleId,
+              shuffleBlockId.mapId, shuffleBlockId.reduceId, e)
+          } else {
+            throw e
+          }
+        }
+      }
     }
```


> FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based 
> shuffle
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-4105
>                 URL: https://issues.apache.org/jira/browse/SPARK-4105
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.2.0, 1.2.1, 1.3.0, 1.4.1
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Critical
>         Attachments: JavaObjectToSerialize.java, 
> SparkFailedToUncompressGenerator.scala
>
>
> We have seen non-deterministic {{FAILED_TO_UNCOMPRESS(5)}} errors during 
> shuffle read.  Here's a sample stacktrace from an executor:
> {code}
> 14/10/23 18:34:11 ERROR Executor: Exception in task 1747.3 in stage 11.0 (TID 
> 33053)
> java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>       at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>       at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>       at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391)
>       at org.xerial.snappy.Snappy.uncompress(Snappy.java:427)
>       at 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
>       at 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
>       at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
>       at 
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
>       at 
> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
>       at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>       at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at 
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at 
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Here's another occurrence of a similar error:
> {code}
> java.io.IOException: failed to read chunk
>         
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:348)
>         
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159)
>         org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
>         
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
>         
> java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2712)
>         
> java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2742)
>         java.io.ObjectInputStream.readArray(ObjectInputStream.java:1687)
>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>         
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>         
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
>         org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>         
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>         org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         org.apache.spark.scheduler.Task.run(Task.scala:56)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> {code}
> The first stacktrace was reported by a Spark user.  The second stacktrace 
> occurred when running
> {code}
> import java.util.Random
> val numKeyValPairs=1000
> val numberOfMappers=200
> val keySize=10000
> for (i <- 0 to 19) {
> val pairs1 = sc.parallelize(0 to numberOfMappers, 
> numberOfMappers).flatMap(p=>{
>   val randGen = new Random
>   val arr1 = new Array[(Int, Array[Byte])](numKeyValPairs)
>   for (i <- 0 until numKeyValPairs){
>     val byteArr = new Array[Byte](keySize)
>     randGen.nextBytes(byteArr)
>     arr1(i) = (randGen.nextInt(Int.MaxValue),byteArr)
>   }
>   arr1
> })
>   pairs1.groupByKey(numberOfMappers).count
> }
> {code}
> This job frequently runs without any problems, but when it fails it seem that 
> every post-shuffle task fails with either PARSING_ERROR(2), 
> FAILED_TO_UNCOMPRESS(5), or some other decompression error.  I've seen 
> reports of similar problems when using LZF compression, so I think that this 
> is caused by some sort of general stream corruption issue. 
> This issue has been observed even when no spilling occurs, so I don't believe 
> that this is due to a bug in spilling code.
> I was unable to reproduce this when running this code in a fresh Spark EC2 
> cluster and we've been having a hard time finding a deterministic 
> reproduction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to