[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953
 ] 

Cameron Todd edited comment on SPARK-18105 at 8/9/21, 10:31 AM:
----------------------------------------------------------------

Yep I understand. I have hashed my data keeping the same distribution and the 
full_name hashed column is weak but string distance functions still work on it. 
Do you have any recommendations where I can upload this data, it's only 2gb?
{code:java}
//So this line of code:
Dataset<Row> relevantPivots = spark.read().parquet(pathToDataDedup)
                                .select("id", "full_name", 
"last_name","birthdate")
                                .na().drop()
                                .withColumn("pivot_hash", 
hash(col("last_name"),col("birthdate")))
                                .drop("last_name","birthdate")
                                .repartition(5000)
                                .cache();
// can be replaced with this
Dataset<Row> relevantPivots = 
spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code}
I have also run the same code on the same hashed data and getting the same 
corrupted stream error. Also in case it wasn't clear my data normally sits on 
an s3 bucket.


was (Author: cameron.todd):
Yep I understand. I have hashed my data keeping the same distribution and the 
full_name hashed column is weak but string distance functions still work on it. 
Do you have any recommendations where I can upload this data?
{code:java}
//So this line of code:
Dataset<Row> relevantPivots = spark.read().parquet(pathToDataDedup)
                                .select("id", "full_name", 
"last_name","birthdate")
                                .na().drop()
                                .withColumn("pivot_hash", 
hash(col("last_name"),col("birthdate")))
                                .drop("last_name","birthdate")
                                .repartition(5000)
                                .cache();
// can be replaced with this
Dataset<Row> relevantPivots = 
spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code}
I have also run the same code on the same hashed data and getting the same 
corrupted stream error. Also in case it wasn't clear my data normally sits on 
an s3 bucket.

> LZ4 failed to decompress a stream of shuffled data
> --------------------------------------------------
>
>                 Key: SPARK-18105
>                 URL: https://issues.apache.org/jira/browse/SPARK-18105
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.1, 3.1.1
>            Reporter: Davies Liu
>            Priority: Major
>         Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>       at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>       at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>       at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>       at java.io.DataInputStream.read(DataInputStream.java:149)
>       at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>       at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>       at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>       at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>       at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>       at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>       at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to