[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856558#comment-16856558
 ] 

Piotr Chowaniec commented on SPARK-18105:
-----------------------------------------

I have a similar issue with Spark 2.3.2.

Here is a stack trace:
{code:java}
org.apache.spark.scheduler.DAGScheduler  : ShuffleMapStage 647 (count at 
Step.java:20) failed in 1.908 s due to 
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_1$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
        at 
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:252)
        at 
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
        at 
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:170)
        at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:349)
        at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
        at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381)
        at org.apache.spark.util.Utils$.copyStream(Utils.scala:357)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436)
        ... 21 more
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 2010 of input 
buffer
        at 
net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
        at 
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:247)
        ... 29 more
{code}
It happens during ETL process that has about 200 steps. It looks like it 
depends on the input data because we have exceptions only on the production 
environment (on test and dev machines same process with different data is 
running without problems). Unfortunately there is no way to use production data 
on other environment, so we cannot find differences. 

Changing compression codec to Snappy gives:
{code:java}
o.apache.spark.scheduler.TaskSetManager  : Lost task 0.0 in stage 852.3 (TID 308
36, localhost, executor driver): FetchFailed(BlockManagerId(driver, DNS.domena, 
33588, None), shuffleId=298, mapId=2, reduceId=3, message=
org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
        at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
        at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
        at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
        at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
        at 
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:439)
        at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
        at java.io.InputStream.read(InputStream.java:101)
        at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:349)
        at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
        at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381)
        at org.apache.spark.util.Utils$.copyStream(Utils.scala:357)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436)
        ... 20 more
{code}

Any ideas how to debug this issue? Any chance to have a workaround or solution?

 

> LZ4 failed to decompress a stream of shuffled data
> --------------------------------------------------
>
>                 Key: SPARK-18105
>                 URL: https://issues.apache.org/jira/browse/SPARK-18105
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Davies Liu
>            Assignee: Davies Liu
>            Priority: Major
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>       at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>       at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>       at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>       at java.io.DataInputStream.read(DataInputStream.java:149)
>       at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>       at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>       at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>       at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>       at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>       at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>       at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to