[ 
https://issues.apache.org/jira/browse/SPARK-35199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331210#comment-17331210
 ] 

Leonard Lausen commented on SPARK-35199:
----------------------------------------

Thank you [~dongjoon]. I'll provide an example with public bucket shortly. 

Do you mean the environment is misconfigured in my cluster in that different 
components use different versions of zstd? I'm using the default configuration 
of AWS EMR 6.2 
[https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-6x.html]

 

 

> Tasks are failing with zstd default of 
> spark.shuffle.mapStatus.compression.codec
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-35199
>                 URL: https://issues.apache.org/jira/browse/SPARK-35199
>             Project: Spark
>          Issue Type: Task
>          Components: PySpark
>    Affects Versions: 3.0.1
>            Reporter: Leonard Lausen
>            Priority: Major
>
> In Spark 3.0.1, tasks fail with the default value of 
> {{spark.shuffle.mapStatus.compression.codec=zstd}}, but work without problem 
> when changing the value to {{spark.shuffle.mapStatus.compression.codec=lz4}}.
> Exemplar backtrace:
>  
> {code:java}
> java.io.IOException: Decompression error: Version not supported at 
> com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:164) 
> at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120) at 
> java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at 
> java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at 
> java.io.BufferedInputStream.read(BufferedInputStream.java:345) at 
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781) 
> at 
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2797)
>  at 
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3274)
>  at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:934) at 
> java.io.ObjectInputStream.(ObjectInputStream.java:396) at 
> org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:954)
>  at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:964)
>  at 
> org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856)
>  at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64) at 
> org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851)
>  at 
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808)
>  at 
> org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128)
>  at 
> org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:185)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.run(Task.scala:127) at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)  {code}
> {{}}
> Exemplar code to reproduce the issue
> {code:java}
> import pyspark.sql.functions as F
> df = spark.read.text("s3://my-bucket-with-300GB-compressed-text-files")
> df_rand = df.orderBy(F.rand(1))
> df_rand.write.text('s3://shuffled-output''){code}
> See 
> [https://stackoverflow.com/questions/64876463/spark-3-0-1-tasks-are-failing-when-using-zstd-compression-codec]
>  for another report of this issue and workaround.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to