This is what you're looking for: Handle large corrupt shuffle blocks https://issues.apache.org/jira/browse/SPARK-26089
So until 3.0 the only way I can think of is to reduce the size/split your job into many On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin <m.prya...@gmail.com> wrote: > Hello, Spark community! > > I've been struggling with my job which constantly fails due to inability > to uncompress some previously compressed blocks while shuffling data. > I use spark 2.2.0 with all the configuration settings left by default (no > specific compression codec is specified). I've ascertained that > LZ4CompressionCodec is used as a default codec. The job fails as soon as > the limit of attempts exceeded with the following message: > > Caused by: java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211) > at > org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125) > at > org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137) > at > org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340) > at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) > at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at org.apache.spark.util.Utils$.copyStream(Utils.scala:348) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395) > ... 28 more > Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of > input buffer > > > Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on how > to workaround this issue? I've tried the Snappy codec but it fails > likewise with a bit different message) > > org.apache.spark.shuffle.FetchFailedException: failed to uncompress the > chunk: FAILED_TO_UNCOMPRESS(5) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: failed to uncompress the chunk: > FAILED_TO_UNCOMPRESS(5) > at > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361) > at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158) > at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) > at java.io.InputStream.read(InputStream.java:101) > at > org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340) > at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) > at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at org.apache.spark.util.Utils$.copyStream(Utils.scala:348) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395) > ... 27 more > > > The option of using no compression seems the only feasible for me at this > point. > I really need your expert assistance, thank you very much in advance! Any > help is greatly appreciated! > > > [1] https://issues.apache.org/jira/browse/SPARK-18105 > > > Cheers, > Mike Pryakhin > > -- Sent from my iPhone