Repository: spark
Updated Branches:
  refs/heads/master 864b764ea -> 8674054d3


[SPARK-16632][SQL] Use Spark requested schema to guide vectorized Parquet 
reader initialization

## What changes were proposed in this pull request?

In `SpecificParquetRecordReaderBase`, which is used by the vectorized Parquet 
reader, we convert the Parquet requested schema into a Spark schema to guide 
column reader initialization. However, the Parquet requested schema is tailored 
from the schema of the physical file being scanned, and may have inaccurate 
type information due to bugs of other systems (e.g. HIVE-14294).

On the other hand, we already set the real Spark requested schema into Hadoop 
configuration in [`ParquetFileFormat`][1]. This PR simply reads out this schema 
to replace the converted one.

## How was this patch tested?

New test case added in `ParquetQuerySuite`.

[1]: 
https://github.com/apache/spark/blob/v2.0.0-rc5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L292-L294

Author: Cheng Lian <l...@databricks.com>

Closes #14278 from liancheng/spark-16632-simpler-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8674054d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8674054d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8674054d

Branch: refs/heads/master
Commit: 8674054d3402b400a4766fe1c9214001cebf2106
Parents: 864b764
Author: Cheng Lian <l...@databricks.com>
Authored: Thu Jul 21 17:15:07 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Jul 21 17:15:07 2016 +0800

----------------------------------------------------------------------
 .../SpecificParquetRecordReaderBase.java        |  5 +++-
 .../datasources/parquet/ParquetQuerySuite.scala | 24 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8674054d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index d823275..04752ec 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -60,6 +60,7 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
 
 /**
  * Base class for custom RecordReaders for Parquet that directly materialize 
to `T`.
@@ -136,7 +137,9 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
         taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), 
fileSchema));
     this.requestedSchema = readContext.getRequestedSchema();
-    this.sparkSchema = new 
ParquetSchemaConverter(configuration).convert(requestedSchema);
+    String sparkRequestedSchemaString =
+        
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
+    this.sparkSchema = 
StructType$.MODULE$.fromString(sparkRequestedSchemaString);
     this.reader = new ParquetFileReader(configuration, file, blocks, 
requestedSchema.getColumns());
     for (BlockMetaData block : blocks) {
       this.totalRowCount += block.getRowCount();

http://git-wip-us.apache.org/repos/asf/spark/blob/8674054d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 02b9445..7e83bcb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -680,6 +680,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       )
     }
   }
+
+  test("SPARK-16632: read Parquet int32 as ByteType and ShortType") {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+
+        // When being written to Parquet, `TINYINT` and `SMALLINT` should be 
converted into
+        // `int32 (INT_8)` and `int32 (INT_16)` respectively. However, Hive 
doesn't add the `INT_8`
+        // and `INT_16` annotation properly (HIVE-14294). Thus, when reading 
files written by Hive
+        // using Spark with the vectorized Parquet reader enabled, we may hit 
error due to type
+        // mismatch.
+        //
+        // Here we are simulating Hive's behavior by writing a single `INT` 
field and then read it
+        // back as `TINYINT` and `SMALLINT` in Spark to verify this issue.
+        Seq(1).toDF("f").write.parquet(path)
+
+        val withByteField = new StructType().add("f", ByteType)
+        checkAnswer(spark.read.schema(withByteField).parquet(path), Row(1: 
Byte))
+
+        val withShortField = new StructType().add("f", ShortType)
+        checkAnswer(spark.read.schema(withShortField).parquet(path), Row(1: 
Short))
+      }
+    }
+  }
 }
 
 object TestingUDT {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to