Repository: spark Updated Branches: refs/heads/branch-2.0 c2b4228d7 -> f9367d6a0
[SPARK-16632][SQL] Use Spark requested schema to guide vectorized Parquet reader initialization In `SpecificParquetRecordReaderBase`, which is used by the vectorized Parquet reader, we convert the Parquet requested schema into a Spark schema to guide column reader initialization. However, the Parquet requested schema is tailored from the schema of the physical file being scanned, and may have inaccurate type information due to bugs of other systems (e.g. HIVE-14294). On the other hand, we already set the real Spark requested schema into Hadoop configuration in [`ParquetFileFormat`][1]. This PR simply reads out this schema to replace the converted one. New test case added in `ParquetQuerySuite`. [1]: https://github.com/apache/spark/blob/v2.0.0-rc5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L292-L294 Author: Cheng Lian <l...@databricks.com> Closes #14278 from liancheng/spark-16632-simpler-fix. (cherry picked from commit 8674054d3402b400a4766fe1c9214001cebf2106) Signed-off-by: Cheng Lian <l...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9367d6a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9367d6a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9367d6a Branch: refs/heads/branch-2.0 Commit: f9367d6a045ca171f86845b92c0def1d212a4fcc Parents: c2b4228 Author: Cheng Lian <l...@databricks.com> Authored: Thu Jul 21 17:15:07 2016 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Thu Jul 21 17:18:53 2016 +0800 ---------------------------------------------------------------------- .../SpecificParquetRecordReaderBase.java | 5 +++- .../datasources/parquet/ParquetQuerySuite.scala | 24 ++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f9367d6a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 1a25679..0d624d1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -60,6 +60,7 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.StructType$; /** * Base class for custom RecordReaders for Parquet that directly materialize to `T`. @@ -136,7 +137,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ReadSupport.ReadContext readContext = readSupport.init(new InitContext( taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema)); this.requestedSchema = readContext.getRequestedSchema(); - this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema); + String sparkRequestedSchemaString = + configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA()); + this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns()); for (BlockMetaData block : blocks) { this.totalRowCount += block.getRowCount(); http://git-wip-us.apache.org/repos/asf/spark/blob/f9367d6a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 83d1001..3201f8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -668,6 +668,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-16632: read Parquet int32 as ByteType and ShortType") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + + // When being written to Parquet, `TINYINT` and `SMALLINT` should be converted into + // `int32 (INT_8)` and `int32 (INT_16)` respectively. However, Hive doesn't add the `INT_8` + // and `INT_16` annotation properly (HIVE-14294). Thus, when reading files written by Hive + // using Spark with the vectorized Parquet reader enabled, we may hit error due to type + // mismatch. + // + // Here we are simulating Hive's behavior by writing a single `INT` field and then read it + // back as `TINYINT` and `SMALLINT` in Spark to verify this issue. + Seq(1).toDF("f").write.parquet(path) + + val withByteField = new StructType().add("f", ByteType) + checkAnswer(spark.read.schema(withByteField).parquet(path), Row(1: Byte)) + + val withShortField = new StructType().add("f", ShortType) + checkAnswer(spark.read.schema(withShortField).parquet(path), Row(1: Short)) + } + } + } } object TestingUDT { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org