[ 
https://issues.apache.org/jira/browse/SPARK-22482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-22482:
---------------------------------
    Labels: bulk-closed  (was: )

> Unreadable Parquet array columns
> --------------------------------
>
>                 Key: SPARK-22482
>                 URL: https://issues.apache.org/jira/browse/SPARK-22482
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>         Environment: Spark 2.1.0
> Parquet 1.8.1
> Hive 1.2
> Hive 2.1.0
> presto 0.157
> presto 0.180
>            Reporter: Costas Piliotis
>            Priority: Major
>              Labels: bulk-closed
>
> We have seen an issue with writing out parquet data from spark.   int and 
> bool arrays seem to be throwing exceptions when trying to read the parquet 
> files from hive and presto.
> I've logged a ticket here:  PARQUET-1157 with the parquet project but I'm not 
> sure if it's an issue within their project or an issue with spark itself.
> Spark is reading parquet-avro data which is output by a mapreduce job and 
> writing it out to parquet.   
> The inbound parquet format has the column defined as:
> {code}
>   optional group playerpositions_ai (LIST) {
>     repeated int32 array;
>   }
> {code}
> Spark is redefining this data as this:
> {code}
>   optional group playerpositions_ai (LIST) {
>     repeated group list {
>       optional int32 element;
>     }
>   }
> {code}
> and with legacy format:
> {code}
>   optional group playerpositions_ai (LIST) {
>     repeated group bag {
>       optional int32 array;
>     }
>   }
> {code}
> The parquet data was tested in Hive 1.2, Hive 2.1, Presto 0.157, Presto 
> 0.180, and Spark 2.1, as well as Amazon Athena (which is some form of presto 
> implementation).   
> I believe that the above schema is valid for writing out parquet.  
> The spark command writing it out is simple:
> {code}
>       data.repartition(((data.count() / 10000000) + 
> 1).toInt).write.format("parquet")
>         .mode("append")
>         .partitionBy(partitionColumns: _*)
>         .save(path)
> {code}
> We initially wrote this out with legacy format turned off but later turned on 
> legacy format and have seen this error occur the same way with legacy off and 
> on.  
> Spark's stack trace from reading this is:
> {code}
> java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
>       at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
>       at 
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:82)
>       at 
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:64)
>       at 
> org.apache.parquet.column.values.dictionary.DictionaryValuesReader.readInteger(DictionaryValuesReader.java:112)
>       at 
> org.apache.parquet.column.impl.ColumnReaderImpl$2$3.read(ColumnReaderImpl.java:243)
>       at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:464)
>       at 
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:370)
>       at 
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
>       at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
>       at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
>       at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>       at org.apache.spark.scheduler.Task.run(Task.scala:99)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Also do note that our data is stored on S3 if that matters.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to