[ 
https://issues.apache.org/jira/browse/SPARK-35199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331233#comment-17331233
 ] 

Leonard Lausen commented on SPARK-35199:
----------------------------------------

I failed to reproduce the issue with the public CommonCrawl S3 Bucket:

 
{code:java}
df = 
spark.read.text("s3://commoncrawl/cc-index/collections/CC-MAIN-2021-17/indexes/*gz")
df_rand = df.orderBy(F.rand(1))
df_rand.write.text('s3://your-bucket/tmp', 
'org.apache.hadoop.io.compress.ZStandardCodec')
{code}
The commoncrawl corpus contains gz compressed files, whereas my bucket as well 
as the stackoverflow report read zstd compressed files. I'll look for a public 
bucket with zst files to reproduce.

[~dongjoon] does reading zst compressed files change any default for how the 
shuffled data is communicated across nodes?

For the private bucket on which I can reproduce the issue, I also note that the 
following works fine
{code:java}
import pyspark.sql.functions as F
df = spark.read.text("s3://my-bucket-with-300GB-compressed-text-files")
df_rand = df.orderBy(F.rand(1))
df_rand.count()
df_rand.take(10){code}
The issue only occurs for me when writing out the complete dataframe.

 

> Tasks are failing with zstd default of 
> spark.shuffle.mapStatus.compression.codec
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-35199
>                 URL: https://issues.apache.org/jira/browse/SPARK-35199
>             Project: Spark
>          Issue Type: Task
>          Components: PySpark
>    Affects Versions: 3.0.1
>            Reporter: Leonard Lausen
>            Priority: Major
>
> In Spark 3.0.1, tasks fail with the default value of 
> {{spark.shuffle.mapStatus.compression.codec=zstd}}, but work without problem 
> when changing the value to {{spark.shuffle.mapStatus.compression.codec=lz4}}.
> Exemplar backtrace:
>  
> {code:java}
> java.io.IOException: Decompression error: Version not supported at 
> com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:164) 
> at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120) at 
> java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at 
> java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at 
> java.io.BufferedInputStream.read(BufferedInputStream.java:345) at 
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781) 
> at 
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2797)
>  at 
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3274)
>  at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:934) at 
> java.io.ObjectInputStream.(ObjectInputStream.java:396) at 
> org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:954)
>  at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:964)
>  at 
> org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856)
>  at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64) at 
> org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851)
>  at 
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808)
>  at 
> org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128)
>  at 
> org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:185)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.run(Task.scala:127) at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)  {code}
> {{}}
> Exemplar code to reproduce the issue
> {code:java}
> import pyspark.sql.functions as F
> df = spark.read.text("s3://my-bucket-with-300GB-compressed-text-files")
> df_rand = df.orderBy(F.rand(1))
> df_rand.write.text('s3://shuffled-output''){code}
> See 
> [https://stackoverflow.com/questions/64876463/spark-3-0-1-tasks-are-failing-when-using-zstd-compression-codec]
>  for another report of this issue and workaround.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to