[ 
https://issues.apache.org/jira/browse/SPARK-27570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Harry Hough updated SPARK-27570:
--------------------------------
    Description: 
I did see issue SPARK-25966 but it seems there are some differences as his 
problem was resolved after rebuilding the parquet files on write. This is 100% 
reproducible for me across many different days of data.

I get exceptions such as "Reached the end of stream with 750477 bytes left to 
read" during some read operations of parquet files. I am reading these files 
from Openstack swift using openstack-hadoop 2.7.7 on Spark 2.4.

The issues seem to happen with the where statement. I have also tried filter 
and combining the statements into one as well as the dataset method with column 
without any luck. Which column or what the actual filter is on the where also 
doesn't seem to make a difference to the error occurring or not.

 
{code:java}
    val engagementDS = spark
      .read
      .parquet(createSwiftAddr("engagements", folder))
      .where("engtype != 0")
      .where("engtype != 1000")
      .groupBy($"accid", $"sessionkey")
      .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
$"testid")).as("engagements"))

// Exiting paste mode, now interpreting.

[Stage 53:> (0 + 32) / 32]2019-04-25 19:02:12 ERROR Executor:91 - Exception in 
task 24.0 in stage 53.0 (TID 688)
java.io.EOFException: Reached the end of stream with 1323959 bytes left to read
at 
org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
at 
org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
at 
org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
at 
org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
at 
org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:107)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
The above gives the error 100% of the time.
{code:java}
    val engagementDS = spark
      .read
      .parquet(createSwiftAddr("engagements", folder))
      .count
{code}
This works correctly as well as doing a .show(false)
{code:java}
    val engagementDS = spark
      .read
      .parquet(createSwiftAddr("engagements", folder))
      .groupBy($"accid", $"sessionkey")
      .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
$"testid")).as("engagements"))
      .show(false)
{code}
Works correctly.
{code:java}
    val engagementDS = spark
      .read
      .parquet(createSwiftAddr("engagements", folder))
      .where("engtype != 0")
      .count
{code}
The above code works but if I do .show(false) instead of count it breaks with 
the reached end of stream error.
{code:java}
engagementDS.select($"engtype").where("engtype != 0").where("engtype != 
1000").show(false)

+-------+
|engtype|
+-------+
|10 |
|17 |
|4 |
|4 |
|10 |
|17 |
|15 |
|10 |
|17 |
|10 |
|16 |
|15 |
|10 |
|16 |
|15 |
|15 |
|10 |
|4 |
|10 |
|17 |
+-------+
only showing top 20 rows
{code}
The above also works correctly.

I can fix this issue with the below so it seems that all the data is there:

 
{code:java}
val engagementDS = spark
.read
.parquet(createSwiftAddr("engagements", folder))
//.filter("engtype != 0 AND engtype != 1000")
.groupBy($"accid", $"sessionkey")
.agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
$"testid")).as("engagements"))
.selectExpr("accid", "sessionkey", "filter(engagements, x -> x.engtype != 1000 
AND x.engtype != 0) AS engagements")
{code}
 

 

Even if I'm doing something incorrectly this seems like a very strange error 
message :)

Thanks for any help in advance.

  was:
I did see issue SPARK-25966 but it seems there are some differences as his 
problem was resolved after rebuilding the parquet files on write. This is 100% 
reproducible for me across many different days of data.

I get exceptions such as "Reached the end of stream with 750477 bytes left to 
read" during some read operations of parquet files. I am reading these files 
from Openstack swift using openstack-hadoop 2.7.7 on Spark 2.4.

The issues mainly seem to happen with the where statement. I have also tried 
filter and combining the statements into one as well as the dataset method with 
column without any luck. Which column or what the actual filter is on the where 
also doesn't seem to make a difference.

 
{code:java}
    val engagementDS = spark
      .read
      .parquet(createSwiftAddr("engagements", folder))
      .where("engtype != 0")
      .where("engtype != 1000")
      .groupBy($"accid", $"sessionkey")
      .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
$"testid")).as("engagements"))

// Exiting paste mode, now interpreting.

[Stage 53:> (0 + 32) / 32]2019-04-25 19:02:12 ERROR Executor:91 - Exception in 
task 24.0 in stage 53.0 (TID 688)
java.io.EOFException: Reached the end of stream with 1323959 bytes left to read
at 
org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
at 
org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
at 
org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
at 
org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
at 
org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:107)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
The above gives the error 100% of the time.
{code:java}
    val engagementDS = spark
      .read
      .parquet(createSwiftAddr("engagements", folder))
      .count
{code}
This works correctly as well as doing a .show(false)
{code:java}
    val engagementDS = spark
      .read
      .parquet(createSwiftAddr("engagements", folder))
      .groupBy($"accid", $"sessionkey")
      .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
$"testid")).as("engagements"))
      .show(false)
{code}
Works correctly.
{code:java}
    val engagementDS = spark
      .read
      .parquet(createSwiftAddr("engagements", folder))
      .where("engtype != 0")
      .count
{code}
The above code works but if I do .show(false) instead of count it breaks with 
the reached end of stream error.
{code:java}
engagementDS.select($"engtype").where("engtype != 0").where("engtype != 
1000").show(false)

+-------+
|engtype|
+-------+
|10 |
|17 |
|4 |
|4 |
|10 |
|17 |
|15 |
|10 |
|17 |
|10 |
|16 |
|15 |
|10 |
|16 |
|15 |
|15 |
|10 |
|4 |
|10 |
|17 |
+-------+
only showing top 20 rows
{code}
The above also works correctly.

I can fix this issue with the below so it seems that all the data is there:

 
{code:java}
val engagementDS = spark
.read
.parquet(createSwiftAddr("engagements", folder))
//.filter("engtype != 0 AND engtype != 1000")
.groupBy($"accid", $"sessionkey")
.agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
$"testid")).as("engagements"))
.selectExpr("accid", "sessionkey", "filter(engagements, x -> x.engtype != 1000 
AND x.engtype != 0) AS engagements")
{code}
 

 

Even if I'm doing something incorrectly this seems like a very strange error 
message :)

Thanks for any help in advance.


> java.io.EOFException Reached the end of stream - Reading Parquet from Swift
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-27570
>                 URL: https://issues.apache.org/jira/browse/SPARK-27570
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Harry Hough
>            Priority: Major
>
> I did see issue SPARK-25966 but it seems there are some differences as his 
> problem was resolved after rebuilding the parquet files on write. This is 
> 100% reproducible for me across many different days of data.
> I get exceptions such as "Reached the end of stream with 750477 bytes left to 
> read" during some read operations of parquet files. I am reading these files 
> from Openstack swift using openstack-hadoop 2.7.7 on Spark 2.4.
> The issues seem to happen with the where statement. I have also tried filter 
> and combining the statements into one as well as the dataset method with 
> column without any luck. Which column or what the actual filter is on the 
> where also doesn't seem to make a difference to the error occurring or not.
>  
> {code:java}
>     val engagementDS = spark
>       .read
>       .parquet(createSwiftAddr("engagements", folder))
>       .where("engtype != 0")
>       .where("engtype != 1000")
>       .groupBy($"accid", $"sessionkey")
>       .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
> $"testid")).as("engagements"))
> // Exiting paste mode, now interpreting.
> [Stage 53:> (0 + 32) / 32]2019-04-25 19:02:12 ERROR Executor:91 - Exception 
> in task 24.0 in stage 53.0 (TID 688)
> java.io.EOFException: Reached the end of stream with 1323959 bytes left to 
> read
> at 
> org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
> at 
> org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
> at 
> org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
> at 
> org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
> at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:107)
> at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The above gives the error 100% of the time.
> {code:java}
>     val engagementDS = spark
>       .read
>       .parquet(createSwiftAddr("engagements", folder))
>       .count
> {code}
> This works correctly as well as doing a .show(false)
> {code:java}
>     val engagementDS = spark
>       .read
>       .parquet(createSwiftAddr("engagements", folder))
>       .groupBy($"accid", $"sessionkey")
>       .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
> $"testid")).as("engagements"))
>       .show(false)
> {code}
> Works correctly.
> {code:java}
>     val engagementDS = spark
>       .read
>       .parquet(createSwiftAddr("engagements", folder))
>       .where("engtype != 0")
>       .count
> {code}
> The above code works but if I do .show(false) instead of count it breaks with 
> the reached end of stream error.
> {code:java}
> engagementDS.select($"engtype").where("engtype != 0").where("engtype != 
> 1000").show(false)
> +-------+
> |engtype|
> +-------+
> |10 |
> |17 |
> |4 |
> |4 |
> |10 |
> |17 |
> |15 |
> |10 |
> |17 |
> |10 |
> |16 |
> |15 |
> |10 |
> |16 |
> |15 |
> |15 |
> |10 |
> |4 |
> |10 |
> |17 |
> +-------+
> only showing top 20 rows
> {code}
> The above also works correctly.
> I can fix this issue with the below so it seems that all the data is there:
>  
> {code:java}
> val engagementDS = spark
> .read
> .parquet(createSwiftAddr("engagements", folder))
> //.filter("engtype != 0 AND engtype != 1000")
> .groupBy($"accid", $"sessionkey")
> .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", 
> $"testid")).as("engagements"))
> .selectExpr("accid", "sessionkey", "filter(engagements, x -> x.engtype != 
> 1000 AND x.engtype != 0) AS engagements")
> {code}
>  
>  
> Even if I'm doing something incorrectly this seems like a very strange error 
> message :)
> Thanks for any help in advance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to