Repository: spark
Updated Branches:
  refs/heads/master fc2326362 -> 75146be6b


[SPARK-16632][SQL] Respect Hive schema when merging parquet schema.

When Hive (or at least certain versions of Hive) creates parquet files
containing tinyint or smallint columns, it stores them as int32, but
doesn't annotate the parquet field as containing the corresponding
int8 / int16 data. When Spark reads those files using the vectorized
reader, it follows the parquet schema for these fields, but when
actually reading the data it tries to use the type fetched from
the metastore, and then fails because data has been loaded into the
wrong fields in OnHeapColumnVector.

So instead of blindly trusting the parquet schema, check whether the
Catalyst-provided schema disagrees with it, and adjust the types so
that the necessary metadata is present when loading the data into
the ColumnVector instance.

Tested with unit tests and with tests that create byte / short columns
in Hive and try to read them from Spark.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #14272 from vanzin/SPARK-16632.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75146be6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75146be6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75146be6

Branch: refs/heads/master
Commit: 75146be6ba5e9f559f5f15430310bb476ee0812c
Parents: fc23263
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Jul 20 13:00:22 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Jul 20 13:00:22 2016 +0800

----------------------------------------------------------------------
 .../parquet/ParquetReadSupport.scala            | 18 +++++++++
 .../parquet/ParquetSchemaSuite.scala            | 39 ++++++++++++++++++++
 2 files changed, 57 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/75146be6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index e6ef634..46d786d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -26,6 +26,8 @@ import org.apache.parquet.hadoop.api.{InitContext, 
ReadSupport}
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
 import org.apache.parquet.io.api.RecordMaterializer
 import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 import org.apache.parquet.schema.Type.Repetition
 
 import org.apache.spark.internal.Logging
@@ -120,6 +122,12 @@ private[parquet] object ParquetReadSupport {
   }
 
   private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {
+    val primName = if (parquetType.isPrimitive()) {
+      parquetType.asPrimitiveType().getPrimitiveTypeName()
+    } else {
+      null
+    }
+
     catalystType match {
       case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
         // Only clips array types with nested type as element type.
@@ -134,6 +142,16 @@ private[parquet] object ParquetReadSupport {
       case t: StructType =>
         clipParquetGroup(parquetType.asGroupType(), t)
 
+      case _: ByteType if primName == INT32 =>
+        // SPARK-16632: Handle case where Hive stores bytes in a int32 field 
without specifying
+        // the original type.
+        Types.primitive(INT32, 
parquetType.getRepetition()).as(INT_8).named(parquetType.getName())
+
+      case _: ShortType if primName == INT32 =>
+        // SPARK-16632: Handle case where Hive stores shorts in a int32 field 
without specifying
+        // the original type.
+        Types.primitive(INT32, 
parquetType.getRepetition()).as(INT_16).named(parquetType.getName())
+
       case _ =>
         // UDTs and primitive types are not clipped.  For UDTs, a clipped 
version might not be able
         // to be mapped to desired user-space types.  So UDTs shouldn't 
participate schema merging.

http://git-wip-us.apache.org/repos/asf/spark/blob/75146be6/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7..31ebec0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1581,4 +1581,43 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         |  }
         |}
       """.stripMargin)
+
+  testSchemaClipping(
+    "int32 parquet field with byte schema field",
+
+    parquetSchema =
+      """message root {
+        |  optional int32 value;
+        |}
+      """.stripMargin,
+
+    catalystSchema =
+      new StructType()
+        .add("value", ByteType, nullable = true),
+
+    expectedSchema =
+      """message root {
+        |  optional int32 value (INT_8);
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "int32 parquet field with short schema field",
+
+    parquetSchema =
+      """message root {
+        |  optional int32 value;
+        |}
+      """.stripMargin,
+
+    catalystSchema =
+      new StructType()
+        .add("value", ShortType, nullable = true),
+
+    expectedSchema =
+      """message root {
+        |  optional int32 value (INT_16);
+        |}
+      """.stripMargin)
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to