[ 
https://issues.apache.org/jira/browse/SPARK-3426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179355#comment-14179355
 ] 

Josh Rosen commented on SPARK-3426:
-----------------------------------

Based on the discussion in that PR, it sounds folks would rather fix the 
underlying bug rather than changing / ignoring configurations.  I'll look into 
a small, targeted fix for this.

> Sort-based shuffle compression behavior is inconsistent
> -------------------------------------------------------
>
>                 Key: SPARK-3426
>                 URL: https://issues.apache.org/jira/browse/SPARK-3426
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.0
>            Reporter: Andrew Or
>            Assignee: Josh Rosen
>            Priority: Blocker
>
> We have the following configs:
> {code}
> spark.shuffle.compress
> spark.shuffle.spill.compress
> {code}
> When these two diverge, sort-based shuffle fails with a compression exception 
> under certain workloads. This is because in sort-based shuffle we serve the 
> index file (using spark.shuffle.spill.compress) as a normal shuffle file 
> (using spark.shuffle.compress). It was unfortunate in retrospect that these 
> two configs were exposed so we can't easily remove them.
> Here is how this can be reproduced. Set the following in your 
> spark-defaults.conf:
> {code}
> spark.master                  local-cluster[1,1,512]
> spark.shuffle.spill.compress  false
> spark.shuffle.compress        true
> spark.shuffle.manager         sort
> spark.shuffle.memoryFraction  0.001
> {code}
> Then run the following in spark-shell:
> {code}
> sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect()
> {code}
> This leads to compression errors, such as the following:
> {code}
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 1.0 (TID 8, joshs-mbp): java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> [info]         
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> [info]         org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> [info]         org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
> [info]         org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
> [info]         
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
> [info]         
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
> [info]         
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
> [info]         
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
> [info]         
> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090)
> [info]         
> org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:350)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
> [info]         
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
> [info]         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> [info]         
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> [info]         
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> [info]         
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
> [info]         
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> [info]         
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> [info]         org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> [info]         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> [info]         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> [info]         
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> [info]         org.apache.spark.scheduler.Task.run(Task.scala:56)
> [info]         
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
> [info]         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [info]         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [info]         java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to