Repository: spark
Updated Branches:
  refs/heads/master cabe1df86 -> f79371ad8


[SPARK-19611][SQL] Introduce configurable table schema inference

## Summary of changes

Add a new configuration option that allows Spark SQL to infer a case-sensitive 
schema from a Hive Metastore table's data files when a case-sensitive schema 
can't be read from the table properties.

- Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf
- Add schemaPreservesCase field to CatalogTable (set to false when schema can't
  successfully be read from Hive table props)
- Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is
  false, depending on spark.sql.hive.caseSensitiveInferenceMode
- Add alterTableSchema() method to the ExternalCatalog interface
- Add HiveSchemaInferenceSuite tests
- Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as
  HiveMetastoreCatalog.mergeWithMetastoreSchema
- Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite

[JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611)

## How was this patch tested?

The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference 
is working as expected. ```ExternalCatalogSuite``` has also been extended to 
cover the new ```alterTableSchema()``` API.

Author: Budde <bu...@amazon.com>

Closes #16944 from budde/SPARK-19611.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f79371ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f79371ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f79371ad

Branch: refs/heads/master
Commit: f79371ad86d94da14bd1ddb53e99a388017b6892
Parents: cabe1df
Author: Budde <bu...@amazon.com>
Authored: Thu Mar 9 12:55:33 2017 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Mar 9 12:55:33 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  |  15 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  10 +
 .../spark/sql/catalyst/catalog/interface.scala  |   8 +-
 .../catalyst/catalog/ExternalCatalogSuite.scala |  15 +-
 .../sql/catalyst/trees/TreeNodeSuite.scala      |   3 +-
 .../datasources/parquet/ParquetFileFormat.scala |  65 ----
 .../org/apache/spark/sql/internal/SQLConf.scala |  22 ++
 .../parquet/ParquetSchemaSuite.scala            |  82 -----
 .../spark/sql/hive/HiveExternalCatalog.scala    |  23 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 100 +++++-
 .../sql/hive/HiveSchemaInferenceSuite.scala     | 305 +++++++++++++++++++
 11 files changed, 489 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 31eded4..08a01e8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
 
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, 
NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.Expression
-
+import org.apache.spark.sql.types.StructType
 
 /**
  * Interface for the system catalog (of functions, partitions, tables, and 
databases).
@@ -104,6 +104,19 @@ abstract class ExternalCatalog {
    */
   def alterTable(tableDefinition: CatalogTable): Unit
 
+  /**
+   * Alter the schema of a table identified by the provided database and table 
name. The new schema
+   * should still contain the existing bucket columns and partition columns 
used by the table. This
+   * method will also update any Spark SQL-related parameters stored as Hive 
table properties (such
+   * as the schema itself).
+   *
+   * @param db Database that table to alter schema for exists in
+   * @param table Name of table to alter schema for
+   * @param schema Updated schema to be used for the table (must contain 
existing partition and
+   *               bucket columns)
+   */
+  def alterTableSchema(db: String, table: String, schema: StructType): Unit
+
   def getTable(db: String, table: String): CatalogTable
 
   def getTableOption(db: String, table: String): Option[CatalogTable]

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 80aba4a..5cc6b0a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.types.StructType
 
 /**
  * An in-memory (ephemeral) implementation of the system catalog.
@@ -297,6 +298,15 @@ class InMemoryCatalog(
     catalog(db).tables(tableDefinition.identifier.table).table = 
tableDefinition
   }
 
+  override def alterTableSchema(
+      db: String,
+      table: String,
+      schema: StructType): Unit = synchronized {
+    requireTableExists(db, table)
+    val origTable = catalog(db).tables(table).table
+    catalog(db).tables(table).table = origTable.copy(schema = schema)
+  }
+
   override def getTable(db: String, table: String): CatalogTable = 
synchronized {
     requireTableExists(db, table)
     catalog(db).tables(table).table

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 4452c47..e3631b0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -163,6 +163,11 @@ case class BucketSpec(
  * @param tracksPartitionsInCatalog whether this table's partition metadata is 
stored in the
  *                                  catalog. If false, it is inferred 
automatically based on file
  *                                  structure.
+ * @param schemaPresevesCase Whether or not the schema resolved for this table 
is case-sensitive.
+ *                           When using a Hive Metastore, this flag is set to 
false if a case-
+ *                           sensitive schema was unable to be read from the 
table properties.
+ *                           Used to trigger case-sensitive schema inference 
at query time, when
+ *                           configured.
  */
 case class CatalogTable(
     identifier: TableIdentifier,
@@ -180,7 +185,8 @@ case class CatalogTable(
     viewText: Option[String] = None,
     comment: Option[String] = None,
     unsupportedFeatures: Seq[String] = Seq.empty,
-    tracksPartitionsInCatalog: Boolean = false) {
+    tracksPartitionsInCatalog: Boolean = false,
+    schemaPreservesCase: Boolean = true) {
 
   import CatalogTable._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 07ccd68..7820f39 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, 
NoSuchDatabaseException, NoSuchFunctionException}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 
@@ -240,6 +240,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     }
   }
 
+  test("alter table schema") {
+    val catalog = newBasicCatalog()
+    val tbl1 = catalog.getTable("db2", "tbl1")
+    val newSchema = StructType(Seq(
+      StructField("new_field_1", IntegerType),
+      StructField("new_field_2", StringType),
+      StructField("a", IntegerType),
+      StructField("b", StringType)))
+    catalog.alterTableSchema("db2", "tbl1", newSchema)
+    val newTbl1 = catalog.getTable("db2", "tbl1")
+    assert(newTbl1.schema == newSchema)
+  }
+
   test("get table") {
     assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == 
"tbl1")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index af1eaa1..37e3dfa 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite {
         "lastAccessTime" -> -1,
         "tracksPartitionsInCatalog" -> false,
         "properties" -> JNull,
-        "unsupportedFeatures" -> List.empty[String]))
+        "unsupportedFeatures" -> List.empty[String],
+        "schemaPreservesCase" -> JBool(true)))
 
     // For unknown case class, returns JNull.
     val bigValue = new Array[Int](10000)

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 828949e..5313c2f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -476,71 +476,6 @@ object ParquetFileFormat extends Logging {
   }
 
   /**
-   * Reconciles Hive Metastore case insensitivity issue and data type 
conflicts between Metastore
-   * schema and Parquet schema.
-   *
-   * Hive doesn't retain case information, while Parquet is case sensitive. On 
the other hand, the
-   * schema read from Parquet files may be incomplete (e.g. older versions of 
Parquet doesn't
-   * distinguish binary and string).  This method generates a correct schema 
by merging Metastore
-   * schema data types and Parquet schema field names.
-   */
-  def mergeMetastoreParquetSchema(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    def schemaConflictMessage: String =
-      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. 
Metastore schema:
-         |${metastoreSchema.prettyJson}
-         |
-         |Parquet schema:
-         |${parquetSchema.prettyJson}
-       """.stripMargin
-
-    val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, 
parquetSchema)
-
-    assert(metastoreSchema.size <= mergedParquetSchema.size, 
schemaConflictMessage)
-
-    val ordinalMap = metastoreSchema.zipWithIndex.map {
-      case (field, index) => field.name.toLowerCase -> index
-    }.toMap
-
-    val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
-      ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
-
-    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
-      // Uses Parquet field names but retains Metastore data types.
-      case (mSchema, pSchema) if mSchema.name.toLowerCase == 
pSchema.name.toLowerCase =>
-        mSchema.copy(name = pSchema.name)
-      case _ =>
-        throw new SparkException(schemaConflictMessage)
-    })
-  }
-
-  /**
-   * Returns the original schema from the Parquet file with any missing 
nullable fields from the
-   * Hive Metastore schema merged in.
-   *
-   * When constructing a DataFrame from a collection of structured data, the 
resulting object has
-   * a schema corresponding to the union of the fields present in each element 
of the collection.
-   * Spark SQL simply assigns a null value to any field that isn't present for 
a particular row.
-   * In some cases, it is possible that a given table partition stored as a 
Parquet file doesn't
-   * contain a particular nullable field in its schema despite that field 
being present in the
-   * table schema obtained from the Hive Metastore. This method returns a 
schema representing the
-   * Parquet file schema along with any additional nullable fields from the 
Metastore schema
-   * merged in.
-   */
-  private[parquet] def mergeMissingNullableFields(
-      metastoreSchema: StructType,
-      parquetSchema: StructType): StructType = {
-    val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
-    val missingFields = metastoreSchema
-      .map(_.name.toLowerCase)
-      .diff(parquetSchema.map(_.name.toLowerCase))
-      .map(fieldMap(_))
-      .filter(_.nullable)
-    StructType(parquetSchema ++ missingFields)
-  }
-
-  /**
    * Reads Parquet footers in multi-threaded manner.
    * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we 
will ignore the corrupted
    * files when reading footers.

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1244f69..8e3f567 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -296,6 +296,25 @@ object SQLConf {
       .longConf
       .createWithDefault(250 * 1024 * 1024)
 
+  object HiveCaseSensitiveInferenceMode extends Enumeration {
+    val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
+  }
+
+  val HIVE_CASE_SENSITIVE_INFERENCE = 
buildConf("spark.sql.hive.caseSensitiveInferenceMode")
+    .doc("Sets the action to take when a case-sensitive schema cannot be read 
from a Hive " +
+      "table's properties. Although Spark SQL itself is not case-sensitive, 
Hive compatible file " +
+      "formats such as Parquet are. Spark SQL must use a case-preserving 
schema when querying " +
+      "any table backed by files containing case-sensitive field names or 
queries may not return " +
+      "accurate results. Valid options include INFER_AND_SAVE (the default 
mode-- infer the " +
+      "case-sensitive schema from the underlying data files and write it back 
to the table " +
+      "properties), INFER_ONLY (infer the schema but don't attempt to write it 
to the table " +
+      "properties) and NEVER_INFER (fallback to using the case-insensitive 
metastore schema " +
+      "instead of inferring).")
+    .stringConf
+    .transform(_.toUpperCase())
+    .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
+    .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
+
   val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
     .doc("When true, enable the metadata-only query optimization that use the 
table's metadata " +
       "to produce the partition columns instead of table scans. It applies 
when all the columns " +
@@ -792,6 +811,9 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def filesourcePartitionFileCacheSize: Long = 
getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
 
+  def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
+    
HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
+
   def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
 
   def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7..6aa940a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -368,88 +368,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }
   }
 
-  test("merge with metastore schema") {
-    // Field type conflict resolution
-    assertResult(
-      StructType(Seq(
-        StructField("lowerCase", StringType),
-        StructField("UPPERCase", DoubleType, nullable = false)))) {
-
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("lowercase", StringType),
-          StructField("uppercase", DoubleType, nullable = false))),
-
-        StructType(Seq(
-          StructField("lowerCase", BinaryType),
-          StructField("UPPERCase", IntegerType, nullable = true))))
-    }
-
-    // MetaStore schema is subset of parquet schema
-    assertResult(
-      StructType(Seq(
-        StructField("UPPERCase", DoubleType, nullable = false)))) {
-
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("uppercase", DoubleType, nullable = false))),
-
-        StructType(Seq(
-          StructField("lowerCase", BinaryType),
-          StructField("UPPERCase", IntegerType, nullable = true))))
-    }
-
-    // Metastore schema contains additional non-nullable fields.
-    assert(intercept[Throwable] {
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("uppercase", DoubleType, nullable = false),
-          StructField("lowerCase", BinaryType, nullable = false))),
-
-        StructType(Seq(
-          StructField("UPPERCase", IntegerType, nullable = true))))
-    }.getMessage.contains("detected conflicting schemas"))
-
-    // Conflicting non-nullable field names
-    intercept[Throwable] {
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(StructField("lower", StringType, nullable = false))),
-        StructType(Seq(StructField("lowerCase", BinaryType))))
-    }
-  }
-
-  test("merge missing nullable fields from Metastore schema") {
-    // Standard case: Metastore schema contains additional nullable fields not 
present
-    // in the Parquet file schema.
-    assertResult(
-      StructType(Seq(
-        StructField("firstField", StringType, nullable = true),
-        StructField("secondField", StringType, nullable = true),
-        StructField("thirdfield", StringType, nullable = true)))) {
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("firstfield", StringType, nullable = true),
-          StructField("secondfield", StringType, nullable = true),
-          StructField("thirdfield", StringType, nullable = true))),
-        StructType(Seq(
-          StructField("firstField", StringType, nullable = true),
-          StructField("secondField", StringType, nullable = true))))
-    }
-
-    // Merge should fail if the Metastore contains any additional fields that 
are not
-    // nullable.
-    assert(intercept[Throwable] {
-      ParquetFileFormat.mergeMetastoreParquetSchema(
-        StructType(Seq(
-          StructField("firstfield", StringType, nullable = true),
-          StructField("secondfield", StringType, nullable = true),
-          StructField("thirdfield", StringType, nullable = false))),
-        StructType(Seq(
-          StructField("firstField", StringType, nullable = true),
-          StructField("secondField", StringType, nullable = true))))
-    }.getMessage.contains("detected conflicting schemas"))
-  }
-
   test("schema merging failure error message") {
     import testImplicits._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 9ab4624..78aa2bd 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -597,6 +597,25 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     }
   }
 
+  override def alterTableSchema(db: String, table: String, schema: 
StructType): Unit = withClient {
+    requireTableExists(db, table)
+    val rawTable = getRawTable(db, table)
+    val withNewSchema = rawTable.copy(schema = schema)
+    // Add table metadata such as table schema, partition columns, etc. to 
table properties.
+    val updatedTable = withNewSchema.copy(
+      properties = withNewSchema.properties ++ 
tableMetaToTableProps(withNewSchema))
+    try {
+      client.alterTable(updatedTable)
+    } catch {
+      case NonFatal(e) =>
+        val warningMessage =
+          s"Could not alter schema of table  
${rawTable.identifier.quotedString} in a Hive " +
+            "compatible way. Updating Hive metastore in Spark SQL specific 
format."
+        logWarning(warningMessage, e)
+        client.alterTable(updatedTable.copy(schema = 
updatedTable.partitionSchema))
+    }
+  }
+
   override def getTable(db: String, table: String): CatalogTable = withClient {
     restoreTableMetadata(getRawTable(db, table))
   }
@@ -690,10 +709,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
           "different from the schema when this table was created by Spark SQL" 
+
           s"(${schemaFromTableProps.simpleString}). We have to fall back to 
the table schema " +
           "from Hive metastore which is not case preserving.")
-        hiveTable
+        hiveTable.copy(schemaPreservesCase = false)
       }
     } else {
-      hiveTable
+      hiveTable.copy(schemaPreservesCase = false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d135dfa..056af49 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,9 +19,12 @@ package org.apache.spark.sql.hive
 
 import java.net.URI
 
+import scala.util.control.NonFatal
+
 import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
@@ -32,6 +35,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
+import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
 import org.apache.spark.sql.types._
 
 /**
@@ -44,6 +48,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   // these are def_s and not val/lazy val since the latter would introduce 
circular references
   private def sessionState = 
sparkSession.sessionState.asInstanceOf[HiveSessionState]
   private def tableRelationCache = 
sparkSession.sessionState.catalog.tableRelationCache
+  import HiveMetastoreCatalog._
 
   private def getCurrentDatabase: String = 
sessionState.catalog.getCurrentDatabase
 
@@ -130,6 +135,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 
     val lazyPruningEnabled = 
sparkSession.sqlContext.conf.manageFilesourcePartitions
     val tablePath = new Path(relation.tableMeta.location)
+    val fileFormat = fileFormatClass.newInstance()
+
     val result = if (relation.isPartitioned) {
       val partitionSchema = relation.tableMeta.partitionSchema
       val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
@@ -170,16 +177,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
             }
           }
 
+          val (dataSchema, updatedTable) =
+            inferIfNeeded(relation, options, fileFormat, Option(fileIndex))
+
           val fsRelation = HadoopFsRelation(
             location = fileIndex,
             partitionSchema = partitionSchema,
-            dataSchema = relation.tableMeta.dataSchema,
+            dataSchema = dataSchema,
             // We don't support hive bucketed tables, only ones we write out.
             bucketSpec = None,
-            fileFormat = fileFormatClass.newInstance(),
+            fileFormat = fileFormat,
             options = options)(sparkSession = sparkSession)
-
-          val created = LogicalRelation(fsRelation, catalogTable = 
Some(relation.tableMeta))
+          val created = LogicalRelation(fsRelation, catalogTable = 
Some(updatedTable))
           tableRelationCache.put(tableIdentifier, created)
           created
         }
@@ -196,17 +205,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
           fileFormatClass,
           None)
         val logicalRelation = cached.getOrElse {
+          val (dataSchema, updatedTable) = inferIfNeeded(relation, options, 
fileFormat)
           val created =
             LogicalRelation(
               DataSource(
                 sparkSession = sparkSession,
                 paths = rootPath.toString :: Nil,
-                userSpecifiedSchema = Some(metastoreSchema),
+                userSpecifiedSchema = Option(dataSchema),
                 // We don't support hive bucketed tables, only ones we write 
out.
                 bucketSpec = None,
                 options = options,
                 className = fileType).resolveRelation(),
-              catalogTable = Some(relation.tableMeta))
+              catalogTable = Some(updatedTable))
 
           tableRelationCache.put(tableIdentifier, created)
           created
@@ -218,6 +228,54 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     result.copy(expectedOutputAttributes = Some(relation.output))
   }
 
+  private def inferIfNeeded(
+      relation: CatalogRelation,
+      options: Map[String, String],
+      fileFormat: FileFormat,
+      fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
+    val inferenceMode = 
sparkSession.sessionState.conf.caseSensitiveInferenceMode
+    val shouldInfer = (inferenceMode != NEVER_INFER) && 
!relation.tableMeta.schemaPreservesCase
+    val tableName = relation.tableMeta.identifier.unquotedString
+    if (shouldInfer) {
+      logInfo(s"Inferring case-sensitive schema for table $tableName 
(inference mode: " +
+        s"$inferenceMode)")
+      val fileIndex = fileIndexOpt.getOrElse {
+        val rootPath = new Path(relation.tableMeta.location)
+        new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None)
+      }
+
+      val inferredSchema = fileFormat
+        .inferSchema(
+          sparkSession,
+          options,
+          fileIndex.listFiles(Nil).flatMap(_.files))
+        .map(mergeWithMetastoreSchema(relation.tableMeta.schema, _))
+
+      inferredSchema match {
+        case Some(schema) =>
+          if (inferenceMode == INFER_AND_SAVE) {
+            updateCatalogSchema(relation.tableMeta.identifier, schema)
+          }
+          (schema, relation.tableMeta.copy(schema = schema))
+        case None =>
+          logWarning(s"Unable to infer schema for table $tableName from file 
format " +
+            s"$fileFormat (inference mode: $inferenceMode). Using metastore 
schema.")
+          (relation.tableMeta.schema, relation.tableMeta)
+      }
+    } else {
+      (relation.tableMeta.schema, relation.tableMeta)
+    }
+  }
+
+  private def updateCatalogSchema(identifier: TableIdentifier, schema: 
StructType): Unit = try {
+    val db = identifier.database.get
+    logInfo(s"Saving case-sensitive schema for table 
${identifier.unquotedString}")
+    sparkSession.sharedState.externalCatalog.alterTableSchema(db, 
identifier.table, schema)
+  } catch {
+    case NonFatal(ex) =>
+      logWarning(s"Unable to save case-sensitive schema for table 
${identifier.unquotedString}", ex)
+  }
+
   /**
    * When scanning or writing to non-partitioned Metastore Parquet tables, 
convert them to Parquet
    * data source relations for better performance.
@@ -287,3 +345,33 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     }
   }
 }
+
+private[hive] object HiveMetastoreCatalog {
+  def mergeWithMetastoreSchema(
+      metastoreSchema: StructType,
+      inferredSchema: StructType): StructType = try {
+    // Find any nullable fields in mestastore schema that are missing from the 
inferred schema.
+    val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> 
f).toMap
+    val missingNullables = metastoreFields
+      .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
+      .values
+      .filter(_.nullable)
+
+    // Merge missing nullable fields to inferred schema and build a 
case-insensitive field map.
+    val inferredFields = StructType(inferredSchema ++ missingNullables)
+      .map(f => f.name.toLowerCase -> f).toMap
+    StructType(metastoreFields.map { case(name, field) =>
+      field.copy(name = inferredFields(name).name)
+    }.toSeq)
+  } catch {
+    case NonFatal(_) =>
+      val msg = s"""Detected conflicting schemas when merging the schema 
obtained from the Hive
+         | Metastore with the one inferred from the file format. Metastore 
schema:
+         |${metastoreSchema.prettyJson}
+         |
+         |Inferred schema:
+         |${inferredSchema.prettyJson}
+       """.stripMargin
+      throw new SparkException(msg)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f79371ad/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
new file mode 100644
index 0000000..7895580
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import 
org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode.{Value => 
InferenceMode, _}
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+class HiveSchemaInferenceSuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils with 
BeforeAndAfterEach {
+
+  import HiveSchemaInferenceSuite._
+  import HiveExternalCatalog.DATASOURCE_SCHEMA_PREFIX
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    FileStatusCache.resetForTesting()
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+    spark.sessionState.catalog.tableRelationCache.invalidateAll()
+    FileStatusCache.resetForTesting()
+  }
+
+  private val externalCatalog = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+  private val client = externalCatalog.client
+
+  // Return a copy of the given schema with all field names converted to lower 
case.
+  private def lowerCaseSchema(schema: StructType): StructType = {
+    StructType(schema.map(f => f.copy(name = f.name.toLowerCase)))
+  }
+
+  // Create a Hive external test table containing the given field and 
partition column names.
+  // Returns a case-sensitive schema for the table.
+  private def setupExternalTable(
+      fileType: String,
+      fields: Seq[String],
+      partitionCols: Seq[String],
+      dir: File): StructType = {
+    // Treat all table fields as bigints...
+    val structFields = fields.map { field =>
+      StructField(
+        name = field,
+        dataType = LongType,
+        nullable = true,
+        metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, 
"bigint").build())
+    }
+    // and all partition columns as ints
+    val partitionStructFields = partitionCols.map { field =>
+      StructField(
+        // Partition column case isn't preserved
+        name = field.toLowerCase,
+        dataType = IntegerType,
+        nullable = true,
+        metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, 
"int").build())
+    }
+    val schema = StructType(structFields ++ partitionStructFields)
+
+    // Write some test data (partitioned if specified)
+    val writer = spark.range(NUM_RECORDS)
+      .selectExpr((fields ++ partitionCols).map("id as " + _): _*)
+      .write
+      .partitionBy(partitionCols: _*)
+      .mode("overwrite")
+    fileType match {
+      case ORC_FILE_TYPE =>
+       writer.orc(dir.getAbsolutePath)
+      case PARQUET_FILE_TYPE =>
+       writer.parquet(dir.getAbsolutePath)
+    }
+
+    // Create Hive external table with lowercased schema
+    val serde = HiveSerDe.serdeMap(fileType)
+    client.createTable(
+      CatalogTable(
+        identifier = TableIdentifier(table = TEST_TABLE_NAME, database = 
Option(DATABASE)),
+        tableType = CatalogTableType.EXTERNAL,
+        storage = CatalogStorageFormat(
+          locationUri = Option(new java.net.URI(dir.getAbsolutePath)),
+          inputFormat = serde.inputFormat,
+          outputFormat = serde.outputFormat,
+          serde = serde.serde,
+          compressed = false,
+          properties = Map("serialization.format" -> "1")),
+        schema = schema,
+        provider = Option("hive"),
+        partitionColumnNames = partitionCols.map(_.toLowerCase),
+        properties = Map.empty),
+      true)
+
+    // Add partition records (if specified)
+    if (!partitionCols.isEmpty) {
+      spark.catalog.recoverPartitions(TEST_TABLE_NAME)
+    }
+
+    // Check that the table returned by HiveExternalCatalog has 
schemaPreservesCase set to false
+    // and that the raw table returned by the Hive client doesn't have any 
Spark SQL properties
+    // set (table needs to be obtained from client since HiveExternalCatalog 
filters these
+    // properties out).
+    assert(!externalCatalog.getTable(DATABASE, 
TEST_TABLE_NAME).schemaPreservesCase)
+    val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
+    
assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)) 
== Map.empty)
+    schema
+  }
+
+  private def withTestTables(
+    fileType: String)(f: (Seq[String], Seq[String], StructType) => Unit): Unit 
= {
+    // Test both a partitioned and unpartitioned Hive table
+    val tableFields = Seq(
+      (Seq("fieldOne"), Seq("partCol1", "partCol2")),
+      (Seq("fieldOne", "fieldTwo"), Seq.empty[String]))
+
+    tableFields.foreach { case (fields, partCols) =>
+      withTempDir { dir =>
+        val schema = setupExternalTable(fileType, fields, partCols, dir)
+        withTable(TEST_TABLE_NAME) { f(fields, partCols, schema) }
+      }
+    }
+  }
+
+  private def withFileTypes(f: (String) => Unit): Unit
+    = Seq(ORC_FILE_TYPE, PARQUET_FILE_TYPE).foreach(f)
+
+  private def withInferenceMode(mode: InferenceMode)(f: => Unit): Unit = {
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_ORC.key -> "true",
+      SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key -> mode.toString)(f)
+  }
+
+  private val inferenceKey = SQLConf.HIVE_CASE_SENSITIVE_INFERENCE.key
+
+  private def testFieldQuery(fields: Seq[String]): Unit = {
+    if (!fields.isEmpty) {
+      val query = s"SELECT * FROM ${TEST_TABLE_NAME} WHERE 
${Random.shuffle(fields).head} >= 0"
+      assert(spark.sql(query).count == NUM_RECORDS)
+    }
+  }
+
+  private def testTableSchema(expectedSchema: StructType): Unit
+    = assert(spark.table(TEST_TABLE_NAME).schema == expectedSchema)
+
+  withFileTypes { fileType =>
+    test(s"$fileType: schema should be inferred and saved when INFER_AND_SAVE 
is specified") {
+      withInferenceMode(INFER_AND_SAVE) {
+        withTestTables(fileType) { (fields, partCols, schema) =>
+          testFieldQuery(fields)
+          testFieldQuery(partCols)
+          testTableSchema(schema)
+
+          // Verify the catalog table now contains the updated schema and 
properties
+          val catalogTable = externalCatalog.getTable(DATABASE, 
TEST_TABLE_NAME)
+          assert(catalogTable.schemaPreservesCase)
+          assert(catalogTable.schema == schema)
+          assert(catalogTable.partitionColumnNames == 
partCols.map(_.toLowerCase))
+        }
+      }
+    }
+  }
+
+  withFileTypes { fileType =>
+    test(s"$fileType: schema should be inferred but not stored when INFER_ONLY 
is specified") {
+      withInferenceMode(INFER_ONLY) {
+        withTestTables(fileType) { (fields, partCols, schema) =>
+          val originalTable = externalCatalog.getTable(DATABASE, 
TEST_TABLE_NAME)
+          testFieldQuery(fields)
+          testFieldQuery(partCols)
+          testTableSchema(schema)
+          // Catalog table shouldn't be altered
+          assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == 
originalTable)
+        }
+      }
+    }
+  }
+
+  withFileTypes { fileType =>
+    test(s"$fileType: schema should not be inferred when NEVER_INFER is 
specified") {
+      withInferenceMode(NEVER_INFER) {
+        withTestTables(fileType) { (fields, partCols, schema) =>
+          val originalTable = externalCatalog.getTable(DATABASE, 
TEST_TABLE_NAME)
+          // Only check the table schema as the test queries will break
+          testTableSchema(lowerCaseSchema(schema))
+          assert(externalCatalog.getTable(DATABASE, TEST_TABLE_NAME) == 
originalTable)
+        }
+      }
+    }
+  }
+
+  test("mergeWithMetastoreSchema() should return expected results") {
+    // Field type conflict resolution
+    assertResult(
+      StructType(Seq(
+        StructField("lowerCase", StringType),
+        StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("lowercase", StringType),
+          StructField("uppercase", DoubleType, nullable = false))),
+
+        StructType(Seq(
+          StructField("lowerCase", BinaryType),
+          StructField("UPPERCase", IntegerType, nullable = true))))
+    }
+
+    // MetaStore schema is subset of parquet schema
+    assertResult(
+      StructType(Seq(
+        StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("uppercase", DoubleType, nullable = false))),
+
+        StructType(Seq(
+          StructField("lowerCase", BinaryType),
+          StructField("UPPERCase", IntegerType, nullable = true))))
+    }
+
+    // Metastore schema contains additional non-nullable fields.
+    assert(intercept[Throwable] {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("uppercase", DoubleType, nullable = false),
+          StructField("lowerCase", BinaryType, nullable = false))),
+
+        StructType(Seq(
+          StructField("UPPERCase", IntegerType, nullable = true))))
+    }.getMessage.contains("Detected conflicting schemas"))
+
+    // Conflicting non-nullable field names
+    intercept[Throwable] {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(StructField("lower", StringType, nullable = false))),
+        StructType(Seq(StructField("lowerCase", BinaryType))))
+    }
+
+    // Check that merging missing nullable fields works as expected.
+    assertResult(
+      StructType(Seq(
+        StructField("firstField", StringType, nullable = true),
+        StructField("secondField", StringType, nullable = true),
+        StructField("thirdfield", StringType, nullable = true)))) {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("firstfield", StringType, nullable = true),
+          StructField("secondfield", StringType, nullable = true),
+          StructField("thirdfield", StringType, nullable = true))),
+        StructType(Seq(
+          StructField("firstField", StringType, nullable = true),
+          StructField("secondField", StringType, nullable = true))))
+    }
+
+    // Merge should fail if the Metastore contains any additional fields that 
are not
+    // nullable.
+    assert(intercept[Throwable] {
+      HiveMetastoreCatalog.mergeWithMetastoreSchema(
+        StructType(Seq(
+          StructField("firstfield", StringType, nullable = true),
+          StructField("secondfield", StringType, nullable = true),
+          StructField("thirdfield", StringType, nullable = false))),
+        StructType(Seq(
+          StructField("firstField", StringType, nullable = true),
+          StructField("secondField", StringType, nullable = true))))
+    }.getMessage.contains("Detected conflicting schemas"))
+  }
+}
+
+object HiveSchemaInferenceSuite {
+  private val NUM_RECORDS = 10
+  private val DATABASE = "default"
+  private val TEST_TABLE_NAME = "test_table"
+  private val ORC_FILE_TYPE = "orc"
+  private val PARQUET_FILE_TYPE = "parquet"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to