[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856558#comment-16856558 ]
Piotr Chowaniec commented on SPARK-18105: ----------------------------------------- I have a similar issue with Spark 2.3.2. Here is a stack trace: {code:java} org.apache.spark.scheduler.DAGScheduler : ShuffleMapStage 647 (count at Step.java:20) failed in 1.908 s due to org.apache.spark.shuffle.FetchFailedException: Stream is corrupted at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:252) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:170) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:349) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381) at org.apache.spark.util.Utils$.copyStream(Utils.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436) ... 21 more Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 2010 of input buffer at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:247) ... 29 more {code} It happens during ETL process that has about 200 steps. It looks like it depends on the input data because we have exceptions only on the production environment (on test and dev machines same process with different data is running without problems). Unfortunately there is no way to use production data on other environment, so we cannot find differences. Changing compression codec to Snappy gives: {code:java} o.apache.spark.scheduler.TaskSetManager : Lost task 0.0 in stage 852.3 (TID 308 36, localhost, executor driver): FetchFailed(BlockManagerId(driver, DNS.domena, 33588, None), shuffleId=298, mapId=2, reduceId=3, message= org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:439) at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167) at java.io.InputStream.read(InputStream.java:101) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:349) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381) at org.apache.spark.util.Utils$.copyStream(Utils.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436) ... 20 more {code} Any ideas how to debug this issue? Any chance to have a workaround or solution? > LZ4 failed to decompress a stream of shuffled data > -------------------------------------------------- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Davies Liu > Assignee: Davies Liu > Priority: Major > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org