[ https://issues.apache.org/jira/browse/SPARK-35199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331233#comment-17331233 ]
Leonard Lausen commented on SPARK-35199: ---------------------------------------- I failed to reproduce the issue with the public CommonCrawl S3 Bucket: {code:java} df = spark.read.text("s3://commoncrawl/cc-index/collections/CC-MAIN-2021-17/indexes/*gz") df_rand = df.orderBy(F.rand(1)) df_rand.write.text('s3://your-bucket/tmp', 'org.apache.hadoop.io.compress.ZStandardCodec') {code} The commoncrawl corpus contains gz compressed files, whereas my bucket as well as the stackoverflow report read zstd compressed files. I'll look for a public bucket with zst files to reproduce. [~dongjoon] does reading zst compressed files change any default for how the shuffled data is communicated across nodes? For the private bucket on which I can reproduce the issue, I also note that the following works fine {code:java} import pyspark.sql.functions as F df = spark.read.text("s3://my-bucket-with-300GB-compressed-text-files") df_rand = df.orderBy(F.rand(1)) df_rand.count() df_rand.take(10){code} The issue only occurs for me when writing out the complete dataframe. > Tasks are failing with zstd default of > spark.shuffle.mapStatus.compression.codec > -------------------------------------------------------------------------------- > > Key: SPARK-35199 > URL: https://issues.apache.org/jira/browse/SPARK-35199 > Project: Spark > Issue Type: Task > Components: PySpark > Affects Versions: 3.0.1 > Reporter: Leonard Lausen > Priority: Major > > In Spark 3.0.1, tasks fail with the default value of > {{spark.shuffle.mapStatus.compression.codec=zstd}}, but work without problem > when changing the value to {{spark.shuffle.mapStatus.compression.codec=lz4}}. > Exemplar backtrace: > > {code:java} > java.io.IOException: Decompression error: Version not supported at > com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:164) > at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120) at > java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at > java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at > java.io.BufferedInputStream.read(BufferedInputStream.java:345) at > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781) > at > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2797) > at > java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3274) > at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:934) at > java.io.ObjectInputStream.(ObjectInputStream.java:396) at > org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:954) > at > org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:964) > at > org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856) > at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64) at > org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851) > at > org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808) > at > org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128) > at > org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:185) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.run(Task.scala:127) at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) {code} > {{}} > Exemplar code to reproduce the issue > {code:java} > import pyspark.sql.functions as F > df = spark.read.text("s3://my-bucket-with-300GB-compressed-text-files") > df_rand = df.orderBy(F.rand(1)) > df_rand.write.text('s3://shuffled-output''){code} > See > [https://stackoverflow.com/questions/64876463/spark-3-0-1-tasks-are-failing-when-using-zstd-compression-codec] > for another report of this issue and workaround. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org