[ https://issues.apache.org/jira/browse/SPARK-3426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179660#comment-14179660 ]
Apache Spark commented on SPARK-3426: ------------------------------------- User 'JoshRosen' has created a pull request for this issue: https://github.com/apache/spark/pull/2890 > Sort-based shuffle compression behavior is inconsistent > ------------------------------------------------------- > > Key: SPARK-3426 > URL: https://issues.apache.org/jira/browse/SPARK-3426 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.1.0, 1.2.0 > Reporter: Andrew Or > Assignee: Josh Rosen > Priority: Blocker > > We have the following configs: > {code} > spark.shuffle.compress > spark.shuffle.spill.compress > {code} > When these two diverge, sort-based shuffle fails with a compression exception > under certain workloads. This is because in sort-based shuffle we serve the > index file (using spark.shuffle.spill.compress) as a normal shuffle file > (using spark.shuffle.compress). It was unfortunate in retrospect that these > two configs were exposed so we can't easily remove them. > Here is how this can be reproduced. Set the following in your > spark-defaults.conf: > {code} > spark.master local-cluster[1,1,512] > spark.shuffle.spill.compress false > spark.shuffle.compress true > spark.shuffle.manager sort > spark.shuffle.memoryFraction 0.001 > {code} > Then run the following in spark-shell: > {code} > sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect() > {code} > This leads to compression errors, such as the following: > {code} > [info] org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 1.0 (TID 8, joshs-mbp): java.io.IOException: FAILED_TO_UNCOMPRESS(5) > [info] > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) > [info] org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > [info] org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) > [info] org.xerial.snappy.Snappy.uncompress(Snappy.java:480) > [info] > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127) > [info] > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) > [info] > org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58) > [info] > org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) > [info] > org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090) > [info] > org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:350) > [info] > org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) > [info] > org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) > [info] > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) > [info] > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) > [info] scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > [info] > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) > [info] > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > [info] > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129) > [info] > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > [info] > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > [info] org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > [info] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > [info] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > [info] > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > [info] org.apache.spark.scheduler.Task.run(Task.scala:56) > [info] > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) > [info] > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > [info] > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > [info] java.lang.Thread.run(Thread.java:745) > {code} > Similarly, with > {code} > spark.shuffle.spill.compress true > spark.shuffle.compress false > {code} > we see > {code} > info] org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 1.0 (TID 8, joshs-mbp): java.io.StreamCorruptedException: invalid > stream header: 82534E41 > [info] > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) > [info] java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) > [info] > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:57) > [info] > org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:57) > [info] > org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) > [info] > org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:355) > [info] > org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:197) > [info] > org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:197) > [info] > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:244) > [info] > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) > [info] scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > [info] > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) > [info] > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > [info] > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129) > [info] > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > [info] > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > [info] org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > [info] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > [info] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > [info] > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > [info] org.apache.spark.scheduler.Task.run(Task.scala:56) > [info] > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) > [info] > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > [info] > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > [info] java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org