[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397898#comment-17397898 ] Cameron Todd commented on SPARK-18105: -- Oh sorry, I meant just a portion of the code can be replaced or only the import of the parquet part because I hashed the data file so the 'pivot_hash' column already exists. The real test begins with the following joins and aggregations per the .java file I uploaded. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397603#comment-17397603 ] Cameron Todd commented on SPARK-18105: -- Good to hear. Also the count of 136,935,074 is right. When you say "I downloaded and ran the test code in my laptop. It works well to me". You mean the java code I attached or do you mean the read file and count() that you just attached above. Because the error is only raised after the first few aggregations and joins in my code and I also doubt your laptop is big enough to process that ;) . I can attach key config variables on your request but I'm using a cloud Kubernetes spark cluster managed by OVH (spark 3.0.1) so it's out of box solution managed by OVH not really me. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397301#comment-17397301 ] Cameron Todd commented on SPARK-18105: -- Ok I added the zip file on this public S3 bucket, it holds a parquet file with snappy compression, it's 1.75gb. You can retrieve the data as such {code:java} wget https://storage.gra.cloud.ovh.net/v1/AUTH_147b880980f148f5ad1af09542e6f37a/public_data/SPARK18105/hashed_data.zip {code} > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953 ] Cameron Todd edited comment on SPARK-18105 at 8/9/21, 10:31 AM: Yep I understand. I have hashed my data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. Do you have any recommendations where I can upload this data, it's only 2gb? {code:java} //So this line of code: Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. was (Author: cameron.todd): Yep I understand. I have hashed my data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. Do you have any recommendations where I can upload this data? {code:java} //So this line of code: Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply
[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953 ] Cameron Todd edited comment on SPARK-18105 at 8/9/21, 10:29 AM: Yep I understand. I have hashed my data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. Do you have any recommendations where I can upload this data? {code:java} //So this line of code: Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. was (Author: cameron.todd): Yep I understand. I have attached my hashed data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. So this line of code: {code:java} Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > a
[jira] [Issue Comment Deleted] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cameron Todd updated SPARK-18105: - Comment: was deleted (was: [^hashed_data.zip]) > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395954#comment-17395954 ] Cameron Todd commented on SPARK-18105: -- [^hashed_data.zip] > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953 ] Cameron Todd commented on SPARK-18105: -- Yep I understand. I have attached my hashed data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. So this line of code: {code:java} Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392971#comment-17392971 ] Cameron Todd commented on SPARK-18105: -- Let me know if that's enough info. From my tests if I remove the .repartition after import of file, I get no corrupted stream error but I get skewed joins and memory problems later. Then if I add the repartition back, the job does not complete because multiple tasks crash and are retried but still crash with the stream corrupted error before completely crashing the job > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392968#comment-17392968 ] Cameron Todd commented on SPARK-18105: -- I'll attach a portion of the code that is not proprietary but it's mostly all there. Anyway the code is one small step in our entity resolution process that processes 1billion person records so lots of pairwise comparisons and skewed joins before aggregate and output staging results.[^TestWeightedGraph.java] > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cameron Todd updated SPARK-18105: - Attachment: TestWeightedGraph.java > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392205#comment-17392205 ] Cameron Todd edited comment on SPARK-18105 at 8/3/21, 9:44 AM: --- I'm also facing this same error when scaling up my project to a larger dataset. It's consistently crashing in the same spot (I can reproduce it) and I'm pretty sure it's coming down to using a repartition() after import as below: {code:java} Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup) .select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded)) .na().drop() .withColumn("pivot_hash", hash(VadisSparkUtils.scalaSeqConverterColumn(pivot))) .drop(VadisSparkUtils.scalaSeqConverterString(pivot)) .repartition(5000) .cache(); {code} I have already tried to disable "spark.unsafe.sorter.spill.read.ahead.enabled" but did not work. My cluster environment is spark 3.0.1 standalone with Kubernetes and using lz4-java-1.7.1.jar Not entirely sure how to debug this further and give you guys anymore info because I can't get the logs on each slave node was (Author: cameron.todd): I'm also facing this same error when scaling up my project to a larger dataset. It's consistently crashing in the same spot and I'm pretty sure it's coming down to using a repartition() after import as below: {code:java} Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup) .select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded)) .na().drop() .withColumn("pivot_hash", hash(VadisSparkUtils.scalaSeqConverterColumn(pivot))) .drop(VadisSparkUtils.scalaSeqConverterString(pivot)) .repartition(5000) .cache(); {code} I have already tried to disable "spark.unsafe.sorter.spill.read.ahead.enabled" but did not work. My cluster environment is spark 3.0.1 standalone with Kubernetes and using lz4-java-1.7.1.jar Not entirely sure how to debug this further and give you guys anymore info because I can't get the logs on each slave node > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationC
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392205#comment-17392205 ] Cameron Todd commented on SPARK-18105: -- I'm also facing this same error when scaling up my project to a larger dataset. It's consistently crashing in the same spot and I'm pretty sure it's coming down to using a repartition() after import as below: {code:java} Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup) .select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded)) .na().drop() .withColumn("pivot_hash", hash(VadisSparkUtils.scalaSeqConverterColumn(pivot))) .drop(VadisSparkUtils.scalaSeqConverterString(pivot)) .repartition(5000) .cache(); {code} I have already tried to disable "spark.unsafe.sorter.spill.read.ahead.enabled" but did not work. My cluster environment is spark 3.0.1 standalone with Kubernetes and using lz4-java-1.7.1.jar Not entirely sure how to debug this further and give you guys anymore info because I can't get the logs on each slave node > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org