Repository: spark
Updated Branches:
  refs/heads/master 2f00a71a8 -> e6e36004a


[SPARK-14387][SPARK-16628][SPARK-18355][SQL] Use Spark schema to read ORC table 
instead of ORC file schema

## What changes were proposed in this pull request?

Before Hive 2.0, ORC File schema has invalid column names like `_col1` and 
`_col2`. This is a well-known limitation and there are several Apache Spark 
issues with `spark.sql.hive.convertMetastoreOrc=true`. This PR ignores ORC File 
schema and use Spark schema.

## How was this patch tested?

Pass the newly added test case.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #19470 from dongjoon-hyun/SPARK-18355.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6e36004
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6e36004
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6e36004

Branch: refs/heads/master
Commit: e6e36004afc3f9fc8abea98542248e9de11b4435
Parents: 2f00a71
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Fri Oct 13 23:09:12 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Oct 13 23:09:12 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/orc/OrcFileFormat.scala      | 31 ++++++----
 .../sql/hive/execution/SQLQuerySuite.scala      | 62 +++++++++++++++++++-
 2 files changed, 80 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e6e36004/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index c76f0eb..194e69c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -134,12 +134,11 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
       // SPARK-8501: Empty ORC files always have an empty schema stored in 
their footer. In this
       // case, `OrcFileOperator.readSchema` returns `None`, and we can't read 
the underlying file
       // using the given physical schema. Instead, we simply return an empty 
iterator.
-      val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), 
Some(conf))
-      if (maybePhysicalSchema.isEmpty) {
+      val isEmptyFile = OrcFileOperator.readSchema(Seq(file.filePath), 
Some(conf)).isEmpty
+      if (isEmptyFile) {
         Iterator.empty
       } else {
-        val physicalSchema = maybePhysicalSchema.get
-        OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
+        OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema)
 
         val orcRecordReader = {
           val job = Job.getInstance(conf)
@@ -163,6 +162,7 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
         // Unwraps `OrcStruct`s to `UnsafeRow`s
         OrcRelation.unwrapOrcStructs(
           conf,
+          dataSchema,
           requiredSchema,
           
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
           recordsIterator)
@@ -272,25 +272,32 @@ private[orc] object OrcRelation extends HiveInspectors {
   def unwrapOrcStructs(
       conf: Configuration,
       dataSchema: StructType,
+      requiredSchema: StructType,
       maybeStructOI: Option[StructObjectInspector],
       iterator: Iterator[Writable]): Iterator[InternalRow] = {
     val deserializer = new OrcSerde
-    val mutableRow = new SpecificInternalRow(dataSchema.map(_.dataType))
-    val unsafeProjection = UnsafeProjection.create(dataSchema)
+    val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType))
+    val unsafeProjection = UnsafeProjection.create(requiredSchema)
 
     def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {
-      val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map {
-        case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal
+      val (fieldRefs, fieldOrdinals) = requiredSchema.zipWithIndex.map {
+        case (field, ordinal) =>
+          var ref = oi.getStructFieldRef(field.name)
+          if (ref == null) {
+            ref = oi.getStructFieldRef("_col" + 
dataSchema.fieldIndex(field.name))
+          }
+          ref -> ordinal
       }.unzip
 
-      val unwrappers = fieldRefs.map(unwrapperFor)
+      val unwrappers = fieldRefs.map(r => if (r == null) null else 
unwrapperFor(r))
 
       iterator.map { value =>
         val raw = deserializer.deserialize(value)
         var i = 0
         val length = fieldRefs.length
         while (i < length) {
-          val fieldValue = oi.getStructFieldData(raw, fieldRefs(i))
+          val fieldRef = fieldRefs(i)
+          val fieldValue = if (fieldRef == null) null else 
oi.getStructFieldData(raw, fieldRef)
           if (fieldValue == null) {
             mutableRow.setNullAt(fieldOrdinals(i))
           } else {
@@ -306,8 +313,8 @@ private[orc] object OrcRelation extends HiveInspectors {
   }
 
   def setRequiredColumns(
-      conf: Configuration, physicalSchema: StructType, requestedSchema: 
StructType): Unit = {
-    val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): 
Integer)
+      conf: Configuration, dataSchema: StructType, requestedSchema: 
StructType): Unit = {
+    val ids = requestedSchema.map(a => dataSchema.fieldIndex(a.name): Integer)
     val (sortedIDs, sortedNames) = 
ids.zip(requestedSchema.fieldNames).sorted.unzip
     HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e6e36004/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 09c5900..94fa43d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -2050,4 +2050,64 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       }
     }
   }
+
+  Seq("orc", "parquet").foreach { format =>
+    test(s"SPARK-18355 Read data from a hive table with a new column - 
$format") {
+      val client = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+
+      Seq("true", "false").foreach { value =>
+        withSQLConf(
+          HiveUtils.CONVERT_METASTORE_ORC.key -> value,
+          HiveUtils.CONVERT_METASTORE_PARQUET.key -> value) {
+          withTempDatabase { db =>
+            client.runSqlHive(
+              s"""
+                 |CREATE TABLE $db.t(
+                 |  click_id string,
+                 |  search_id string,
+                 |  uid bigint)
+                 |PARTITIONED BY (
+                 |  ts string,
+                 |  hour string)
+                 |STORED AS $format
+              """.stripMargin)
+
+            client.runSqlHive(
+              s"""
+                 |INSERT INTO TABLE $db.t
+                 |PARTITION (ts = '98765', hour = '01')
+                 |VALUES (12, 2, 12345)
+              """.stripMargin
+            )
+
+            checkAnswer(
+              sql(s"SELECT click_id, search_id, uid, ts, hour FROM $db.t"),
+              Row("12", "2", 12345, "98765", "01"))
+
+            client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)")
+
+            checkAnswer(
+              sql(s"SELECT click_id, search_id FROM $db.t"),
+              Row("12", "2"))
+
+            checkAnswer(
+              sql(s"SELECT search_id, click_id FROM $db.t"),
+              Row("2", "12"))
+
+            checkAnswer(
+              sql(s"SELECT search_id FROM $db.t"),
+              Row("2"))
+
+            checkAnswer(
+              sql(s"SELECT dummy, click_id FROM $db.t"),
+              Row(null, "12"))
+
+            checkAnswer(
+              sql(s"SELECT click_id, search_id, uid, dummy, ts, hour FROM 
$db.t"),
+              Row("12", "2", 12345, null, "98765", "01"))
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to