[ 
https://issues.apache.org/jira/browse/SPARK-3426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen resolved SPARK-3426.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 1.2.0
                   1.1.1

Fixed in 1.1.1. and 1.2.0 by my PR.

> Sort-based shuffle compression behavior is inconsistent
> -------------------------------------------------------
>
>                 Key: SPARK-3426
>                 URL: https://issues.apache.org/jira/browse/SPARK-3426
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.0, 1.2.0
>            Reporter: Andrew Or
>            Assignee: Josh Rosen
>            Priority: Blocker
>             Fix For: 1.1.1, 1.2.0
>
>
> We have the following configs:
> {code}
> spark.shuffle.compress
> spark.shuffle.spill.compress
> {code}
> When these two diverge, sort-based shuffle fails with a compression exception 
> under certain workloads. This is because in sort-based shuffle we serve the 
> index file (using spark.shuffle.spill.compress) as a normal shuffle file 
> (using spark.shuffle.compress). It was unfortunate in retrospect that these 
> two configs were exposed so we can't easily remove them.
> Here is how this can be reproduced. Set the following in your 
> spark-defaults.conf:
> {code}
> spark.master                  local-cluster[1,1,512]
> spark.shuffle.spill.compress  false
> spark.shuffle.compress        true
> spark.shuffle.manager         sort
> spark.shuffle.memoryFraction  0.001
> {code}
> Then run the following in spark-shell:
> {code}
> sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect()
> {code}
> This leads to compression errors, such as the following:
> {code}
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 1.0 (TID 8, joshs-mbp): java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> [info]         
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> [info]         org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> [info]         org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
> [info]         org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
> [info]         
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
> [info]         
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
> [info]         
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
> [info]         
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
> [info]         
> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090)
> [info]         
> org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:350)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
> [info]         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> [info]         
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> [info]         
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> [info]         
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
> [info]         
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> [info]         
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> [info]         org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> [info]         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> [info]         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> [info]         
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> [info]         org.apache.spark.scheduler.Task.run(Task.scala:56)
> [info]         
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
> [info]         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [info]         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [info]         java.lang.Thread.run(Thread.java:745)
> {code}
> Similarly, with
> {code}
> spark.shuffle.spill.compress  true
> spark.shuffle.compress        false
> {code}
> we see
> {code}
> info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 1.0 (TID 8, joshs-mbp): java.io.StreamCorruptedException: invalid 
> stream header: 82534E41
> [info]         
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
> [info]         java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> [info]         
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:57)
> [info]         
> org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:57)
> [info]         
> org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95)
> [info]         
> org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:355)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:197)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:197)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:244)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
> [info]         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> [info]         
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> [info]         
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> [info]         
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
> [info]         
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> [info]         
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> [info]         org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> [info]         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> [info]         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> [info]         
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> [info]         org.apache.spark.scheduler.Task.run(Task.scala:56)
> [info]         
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
> [info]         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [info]         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [info]         java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to