[ https://issues.apache.org/jira/browse/SPARK-27570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16887498#comment-16887498 ]
Josh Rosen commented on SPARK-27570: ------------------------------------ [~ste...@apache.org], I finally got a chance to test your {{fadvise}} configuration recommendation and that resolved my issue. *However*, I think that there's a typo in your recommendation: this only worked when I used {{fs.s3a.experimental.*input*.fadvise}} (the {{.input}} was missing in your comment). > java.io.EOFException Reached the end of stream - Reading Parquet from Swift > --------------------------------------------------------------------------- > > Key: SPARK-27570 > URL: https://issues.apache.org/jira/browse/SPARK-27570 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: Harry Hough > Priority: Major > > I did see issue SPARK-25966 but it seems there are some differences as his > problem was resolved after rebuilding the parquet files on write. This is > 100% reproducible for me across many different days of data. > I get exceptions such as "Reached the end of stream with 750477 bytes left to > read" during some read operations of parquet files. I am reading these files > from Openstack swift using openstack-hadoop 2.7.7 on Spark 2.4. > The issues seem to happen with the where statement. I have also tried filter > and combining the statements into one as well as the dataset method with > column without any luck. Which column or what the actual filter is on the > where also doesn't seem to make a difference to the error occurring or not. > > {code:java} > val engagementDS = spark > .read > .parquet(createSwiftAddr("engagements", folder)) > .where("engtype != 0") > .where("engtype != 1000") > .groupBy($"accid", $"sessionkey") > .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", > $"testid")).as("engagements")) > // Exiting paste mode, now interpreting. > [Stage 53:> (0 + 32) / 32]2019-04-25 19:02:12 ERROR Executor:91 - Exception > in task 24.0 in stage 53.0 (TID 688) > java.io.EOFException: Reached the end of stream with 1323959 bytes left to > read > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104) > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127) > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91) > at > org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174) > at > org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619) > at > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:107) > at > org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > The above gives the error 100% of the time. > {code:java} > val engagementDS = spark > .read > .parquet(createSwiftAddr("engagements", folder)) > .count > {code} > This works correctly as well as doing a .show(false) > {code:java} > val engagementDS = spark > .read > .parquet(createSwiftAddr("engagements", folder)) > .groupBy($"accid", $"sessionkey") > .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", > $"testid")).as("engagements")) > .show(false) > {code} > Works correctly. > {code:java} > val engagementDS = spark > .read > .parquet(createSwiftAddr("engagements", folder)) > .where("engtype != 0") > .count > {code} > The above code works but if I do .show(false) instead of count it breaks with > the reached end of stream error. > {code:java} > engagementDS.select($"engtype").where("engtype != 0").where("engtype != > 1000").show(false) > +-------+ > |engtype| > +-------+ > |10 | > |17 | > |4 | > |4 | > |10 | > |17 | > |15 | > |10 | > |17 | > |10 | > |16 | > |15 | > |10 | > |16 | > |15 | > |15 | > |10 | > |4 | > |10 | > |17 | > +-------+ > only showing top 20 rows > {code} > The above also works correctly. > I can fix this issue with the below so it seems that all the data is there: > > {code:java} > val engagementDS = spark > .read > .parquet(createSwiftAddr("engagements", folder)) > //.filter("engtype != 0 AND engtype != 1000") > .groupBy($"accid", $"sessionkey") > .agg(collect_list(struct($"time", $"pid", $"engtype", $"pageid", > $"testid")).as("engagements")) > .selectExpr("accid", "sessionkey", "filter(engagements, x -> x.engtype != > 1000 AND x.engtype != 0) AS engagements") > {code} > > > Even if I'm doing something incorrectly this seems like a very strange error > message :) > Thanks for any help in advance. -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org