[ https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Franklyn Dsouza updated SPARK-19299: ------------------------------------ Summary: Nulls in non nullable columns cause data corruption in parquet (was: Nulls in non nullable columns causes data corruption in parquet) > Nulls in non nullable columns cause data corruption in parquet > -------------------------------------------------------------- > > Key: SPARK-19299 > URL: https://issues.apache.org/jira/browse/SPARK-19299 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core > Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Reporter: Franklyn Dsouza > Priority: Critical > > The problem we're seeing is that if a null occurs in a non-nullable field and > is written down to parquet the resulting file gets corrupted and can not be > read back correctly. > One way that this can occur is if a long value in python overflows the sql > LongType, this results in a null value inside the dataframe. > We're also seeing that the behaviour is different depending on whether or not > the vectorized reader is enabled. > Here's an example in PySpark > {code} > from datetime import datetime > from pyspark.sql import types > data = [ > (1, 6), > (2, 7), > (3, 2 ** 64), # value overflows sql LongType > (4, 8), > (5, 9) > ] > schema = types.StructType([ > types.StructField("index", types.LongType(), False), > types.StructField("long", types.LongType(), False), > ]) > df = sc.sql.createDataFrame(data, schema) > df.collect() > df.write.parquet("corrupt_parquet") > df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet") > df_parquet.collect() > {code} > with the vectorized reader enabled this causes > {code} > In [2]: df.collect() > Out[2]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=None), > Row(index=4, long=8), > Row(index=5, long=9)] > In [3]: df_parquet.collect() > Out[3]: > [Row(index=1, long=6), > Row(index=2, long=7), > Row(index=3, long=8), > Row(index=4, long=9), > Row(index=5, long=5)] > {code} > as you can see reading the data back from disk causes data to get shifted up > and between columns. > with the vectorized reader disabled we are completely unable to read the file. > {code} > Py4JJavaError: An error occurred while calling o143.collectToPython. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 > (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not > read value at 4 in block 0 in file > file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-00000-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value > in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. > repetition level: 0, definition level: 0 > at > org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462) > at > org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364) > at > org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209) > ... 19 more > Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long > at > org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131) > at > org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258) > at > org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:458) > ... 22 more > Caused by: java.io.EOFException > at > org.apache.parquet.bytes.LittleEndianDataInputStream.readFully(LittleEndianDataInputStream.java:90) > at > org.apache.parquet.bytes.LittleEndianDataInputStream.readLong(LittleEndianDataInputStream.java:377) > at > org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:129) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org