Repository: spark
Updated Branches:
  refs/heads/master 882079f5c -> 2fd12af43


[SPARK-22306][SQL] alter table schema should not erase the bucketing metadata 
at hive side

forward-port https://github.com/apache/spark/pull/19622 to master branch.

This bug doesn't exist in master because we've added hive bucketing support and 
the hive bucketing metadata can be recognized by Spark, but we should still 
port it to master: 1) there may be other unsupported hive metadata removed by 
Spark. 2) reduce code difference between master and 2.2 to ease the backport in 
the feature.

***

When we alter table schema, we set the new schema to spark `CatalogTable`, 
convert it to hive table, and finally call `hive.alterTable`. This causes a 
problem in Spark 2.2, because hive bucketing metedata is not recognized by 
Spark, which means a Spark `CatalogTable` representing a hive table is always 
non-bucketed, and when we convert it to hive table and call `hive.alterTable`, 
the original hive bucketing metadata will be removed.

To fix this bug, we should read out the raw hive table metadata, update its 
schema, and call `hive.alterTable`. By doing this we can guarantee only the 
schema is changed, and nothing else.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #19644 from cloud-fan/infer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fd12af4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fd12af4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fd12af4

Branch: refs/heads/master
Commit: 2fd12af4372a1e2c3faf0eb5d0a1cf530abc0016
Parents: 882079f
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Nov 2 23:41:16 2017 +0100
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Nov 2 23:41:16 2017 +0100

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  | 12 +++--
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  7 +--
 .../sql/catalyst/catalog/SessionCatalog.scala   | 23 +++++-----
 .../catalyst/catalog/ExternalCatalogSuite.scala | 10 ++---
 .../catalyst/catalog/SessionCatalogSuite.scala  |  8 ++--
 .../spark/sql/execution/command/ddl.scala       | 14 +++---
 .../spark/sql/execution/command/tables.scala    | 14 +++---
 .../datasources/DataSourceStrategy.scala        |  4 +-
 .../datasources/orc/OrcFileFormat.scala         |  5 +--
 .../datasources/parquet/ParquetFileFormat.scala |  5 +--
 .../parquet/ParquetSchemaConverter.scala        |  5 +--
 .../spark/sql/hive/HiveExternalCatalog.scala    | 47 ++++++++++++--------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 11 +++--
 .../apache/spark/sql/hive/HiveStrategies.scala  |  4 +-
 .../spark/sql/hive/client/HiveClient.scala      | 11 +++++
 .../spark/sql/hive/client/HiveClientImpl.scala  | 45 ++++++++++++-------
 .../sql/hive/HiveExternalCatalogSuite.scala     | 18 ++++++++
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  4 +-
 .../sql/hive/execution/Hive_2_1_DDLSuite.scala  |  2 +-
 19 files changed, 146 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index d4c58db..223094d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -150,17 +150,15 @@ abstract class ExternalCatalog
   def alterTable(tableDefinition: CatalogTable): Unit
 
   /**
-   * Alter the schema of a table identified by the provided database and table 
name. The new schema
-   * should still contain the existing bucket columns and partition columns 
used by the table. This
-   * method will also update any Spark SQL-related parameters stored as Hive 
table properties (such
-   * as the schema itself).
+   * Alter the data schema of a table identified by the provided database and 
table name. The new
+   * data schema should not have conflict column names with the existing 
partition columns, and
+   * should still contain all the existing data columns.
    *
    * @param db Database that table to alter schema for exists in
    * @param table Name of table to alter schema for
-   * @param schema Updated schema to be used for the table (must contain 
existing partition and
-   *               bucket columns)
+   * @param newDataSchema Updated data schema to be used for the table.
    */
-  def alterTableSchema(db: String, table: String, schema: StructType): Unit
+  def alterTableDataSchema(db: String, table: String, newDataSchema: 
StructType): Unit
 
   /** Alter the statistics of a table. If `stats` is None, then remove all 
existing statistics. */
   def alterTableStats(db: String, table: String, stats: 
Option[CatalogStatistics]): Unit

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 98370c1..9504140 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -303,13 +303,14 @@ class InMemoryCatalog(
     catalog(db).tables(tableDefinition.identifier.table).table = 
newTableDefinition
   }
 
-  override def alterTableSchema(
+  override def alterTableDataSchema(
       db: String,
       table: String,
-      schema: StructType): Unit = synchronized {
+      newDataSchema: StructType): Unit = synchronized {
     requireTableExists(db, table)
     val origTable = catalog(db).tables(table).table
-    catalog(db).tables(table).table = origTable.copy(schema = schema)
+    val newSchema = StructType(newDataSchema ++ origTable.partitionSchema)
+    catalog(db).tables(table).table = origTable.copy(schema = newSchema)
   }
 
   override def alterTableStats(

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 95bc3d6..a129896 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -324,18 +324,16 @@ class SessionCatalog(
   }
 
   /**
-   * Alter the schema of a table identified by the provided table identifier. 
The new schema
-   * should still contain the existing bucket columns and partition columns 
used by the table. This
-   * method will also update any Spark SQL-related parameters stored as Hive 
table properties (such
-   * as the schema itself).
+   * Alter the data schema of a table identified by the provided table 
identifier. The new data
+   * schema should not have conflict column names with the existing partition 
columns, and should
+   * still contain all the existing data columns.
    *
    * @param identifier TableIdentifier
-   * @param newSchema Updated schema to be used for the table (must contain 
existing partition and
-   *                  bucket columns, and partition columns need to be at the 
end)
+   * @param newDataSchema Updated data schema to be used for the table
    */
-  def alterTableSchema(
+  def alterTableDataSchema(
       identifier: TableIdentifier,
-      newSchema: StructType): Unit = {
+      newDataSchema: StructType): Unit = {
     val db = 
formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(identifier.table)
     val tableIdentifier = TableIdentifier(table, Some(db))
@@ -343,10 +341,10 @@ class SessionCatalog(
     requireTableExists(tableIdentifier)
 
     val catalogTable = externalCatalog.getTable(db, table)
-    val oldSchema = catalogTable.schema
-
+    val oldDataSchema = catalogTable.dataSchema
     // not supporting dropping columns yet
-    val nonExistentColumnNames = 
oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
+    val nonExistentColumnNames =
+      oldDataSchema.map(_.name).filterNot(columnNameResolved(newDataSchema, _))
     if (nonExistentColumnNames.nonEmpty) {
       throw new AnalysisException(
         s"""
@@ -355,8 +353,7 @@ class SessionCatalog(
          """.stripMargin)
     }
 
-    // assuming the newSchema has all partition columns at the end as required
-    externalCatalog.alterTableSchema(db, table, newSchema)
+    externalCatalog.alterTableDataSchema(db, table, newDataSchema)
   }
 
   private def columnNameResolved(schema: StructType, colName: String): Boolean 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 94593ef..b376108 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -245,14 +245,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
 
   test("alter table schema") {
     val catalog = newBasicCatalog()
-    val newSchema = StructType(Seq(
+    val newDataSchema = StructType(Seq(
       StructField("col1", IntegerType),
-      StructField("new_field_2", StringType),
-      StructField("a", IntegerType),
-      StructField("b", StringType)))
-    catalog.alterTableSchema("db2", "tbl1", newSchema)
+      StructField("new_field_2", StringType)))
+    catalog.alterTableDataSchema("db2", "tbl1", newDataSchema)
     val newTbl1 = catalog.getTable("db2", "tbl1")
-    assert(newTbl1.schema == newSchema)
+    assert(newTbl1.dataSchema == newDataSchema)
   }
 
   test("alter table stats") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 1cce199..95c87ff 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -463,9 +463,9 @@ abstract class SessionCatalogSuite extends AnalysisTest {
     withBasicCatalog { sessionCatalog =>
       sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = 
false)
       val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
-      sessionCatalog.alterTableSchema(
+      sessionCatalog.alterTableDataSchema(
         TableIdentifier("t1", Some("default")),
-        StructType(oldTab.dataSchema.add("c3", IntegerType) ++ 
oldTab.partitionSchema))
+        StructType(oldTab.dataSchema.add("c3", IntegerType)))
 
       val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
       // construct the expected table schema
@@ -480,8 +480,8 @@ abstract class SessionCatalogSuite extends AnalysisTest {
       sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = 
false)
       val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
       val e = intercept[AnalysisException] {
-        sessionCatalog.alterTableSchema(
-          TableIdentifier("t1", Some("default")), 
StructType(oldTab.schema.drop(1)))
+        sessionCatalog.alterTableDataSchema(
+          TableIdentifier("t1", Some("default")), 
StructType(oldTab.dataSchema.drop(1)))
       }.getMessage
       assert(e.contains("We don't support dropping columns yet."))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 162e1d5..a9cd65e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -857,19 +857,23 @@ object DDLUtils {
     }
   }
 
-  private[sql] def checkDataSchemaFieldNames(table: CatalogTable): Unit = {
+  private[sql] def checkDataColNames(table: CatalogTable): Unit = {
+    checkDataColNames(table, table.dataSchema.fieldNames)
+  }
+
+  private[sql] def checkDataColNames(table: CatalogTable, colNames: 
Seq[String]): Unit = {
     table.provider.foreach {
       _.toLowerCase(Locale.ROOT) match {
         case HIVE_PROVIDER =>
           val serde = table.storage.serde
           if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
-            OrcFileFormat.checkFieldNames(table.dataSchema)
+            OrcFileFormat.checkFieldNames(colNames)
           } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
               serde == Some("parquet.hive.serde.ParquetHiveSerDe")) {
-            ParquetSchemaConverter.checkFieldNames(table.dataSchema)
+            ParquetSchemaConverter.checkFieldNames(colNames)
           }
-        case "parquet" => 
ParquetSchemaConverter.checkFieldNames(table.dataSchema)
-        case "orc" => OrcFileFormat.checkFieldNames(table.dataSchema)
+        case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames)
+        case "orc" => OrcFileFormat.checkFieldNames(colNames)
         case _ =>
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 38f9163..95f16b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -186,7 +186,7 @@ case class AlterTableRenameCommand(
 */
 case class AlterTableAddColumnsCommand(
     table: TableIdentifier,
-    columns: Seq[StructField]) extends RunnableCommand {
+    colsToAdd: Seq[StructField]) extends RunnableCommand {
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val catalogTable = verifyAlterTableAddColumn(catalog, table)
@@ -199,17 +199,13 @@ case class AlterTableAddColumnsCommand(
     }
     catalog.refreshTable(table)
 
-    // make sure any partition columns are at the end of the fields
-    val reorderedSchema = catalogTable.dataSchema ++ columns ++ 
catalogTable.partitionSchema
-    val newSchema = catalogTable.schema.copy(fields = reorderedSchema.toArray)
-
     SchemaUtils.checkColumnNameDuplication(
-      reorderedSchema.map(_.name), "in the table definition of " + 
table.identifier,
+      (colsToAdd ++ catalogTable.schema).map(_.name),
+      "in the table definition of " + table.identifier,
       conf.caseSensitiveAnalysis)
-    DDLUtils.checkDataSchemaFieldNames(catalogTable.copy(schema = newSchema))
-
-    catalog.alterTableSchema(table, newSchema)
+    DDLUtils.checkDataColNames(catalogTable, colsToAdd.map(_.name))
 
+    catalog.alterTableDataSchema(table, StructType(catalogTable.dataSchema ++ 
colsToAdd))
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 018f24e..04d6f3f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -133,12 +133,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case CreateTable(tableDesc, mode, None) if 
DDLUtils.isDatasourceTable(tableDesc) =>
-      DDLUtils.checkDataSchemaFieldNames(tableDesc)
+      DDLUtils.checkDataColNames(tableDesc)
       CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == 
SaveMode.Ignore)
 
     case CreateTable(tableDesc, mode, Some(query))
         if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
-      DDLUtils.checkDataSchemaFieldNames(tableDesc.copy(schema = query.schema))
+      DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
       CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
 
     case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 2eeb006..215740e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -35,8 +35,7 @@ private[sql] object OrcFileFormat {
     }
   }
 
-  def checkFieldNames(schema: StructType): StructType = {
-    schema.fieldNames.foreach(checkFieldName)
-    schema
+  def checkFieldNames(names: Seq[String]): Unit = {
+    names.foreach(checkFieldName)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c1535ba..61bd65d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -23,7 +23,6 @@ import java.net.URI
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.parallel.ForkJoinTaskSupport
-import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Try}
 
 import org.apache.hadoop.conf.Configuration
@@ -306,10 +305,10 @@ class ParquetFileFormat
     hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
     hadoopConf.set(
       ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-      ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
+      requiredSchema.json)
     hadoopConf.set(
       ParquetWriteSupport.SPARK_ROW_SCHEMA,
-      ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
+      requiredSchema.json)
 
     ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index b3781cf..cd384d1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -571,9 +571,8 @@ private[sql] object ParquetSchemaConverter {
        """.stripMargin.split("\n").mkString(" ").trim)
   }
 
-  def checkFieldNames(schema: StructType): StructType = {
-    schema.fieldNames.foreach(checkFieldName)
-    schema
+  def checkFieldNames(names: Seq[String]): Unit = {
+    names.foreach(checkFieldName)
   }
 
   def checkConversionRequirement(f: => Boolean, message: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 96dc983..f8a947b 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -138,16 +138,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   }
 
   /**
-   * Checks the validity of column names. Hive metastore disallows the table 
to use comma in
+   * Checks the validity of data column names. Hive metastore disallows the 
table to use comma in
    * data column names. Partition columns do not have such a restriction. 
Views do not have such
    * a restriction.
    */
-  private def verifyColumnNames(table: CatalogTable): Unit = {
-    if (table.tableType != VIEW) {
-      table.dataSchema.map(_.name).foreach { colName =>
+  private def verifyDataSchema(
+      tableName: TableIdentifier, tableType: CatalogTableType, dataSchema: 
StructType): Unit = {
+    if (tableType != VIEW) {
+      dataSchema.map(_.name).foreach { colName =>
         if (colName.contains(",")) {
           throw new AnalysisException("Cannot create a table having a column 
whose name contains " +
-            s"commas in Hive metastore. Table: ${table.identifier}; Column: 
$colName")
+            s"commas in Hive metastore. Table: $tableName; Column: $colName")
         }
       }
     }
@@ -218,7 +219,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val table = tableDefinition.identifier.table
     requireDbExists(db)
     verifyTableProperties(tableDefinition)
-    verifyColumnNames(tableDefinition)
+    verifyDataSchema(
+      tableDefinition.identifier, tableDefinition.tableType, 
tableDefinition.dataSchema)
 
     if (tableExists(db, table) && !ignoreIfExists) {
       throw new TableAlreadyExistsException(db = db, table = table)
@@ -296,7 +298,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         storage = table.storage.copy(
           locationUri = None,
           properties = storagePropsWithLocation),
-        schema = table.partitionSchema,
+        schema = StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema),
         bucketSpec = None,
         properties = table.properties ++ tableProperties)
     }
@@ -617,32 +619,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     }
   }
 
-  override def alterTableSchema(db: String, table: String, schema: 
StructType): Unit = withClient {
+  override def alterTableDataSchema(
+      db: String, table: String, newDataSchema: StructType): Unit = withClient 
{
     requireTableExists(db, table)
-    val rawTable = getRawTable(db, table)
-    // Add table metadata such as table schema, partition columns, etc. to 
table properties.
-    val updatedProperties = rawTable.properties ++ 
tableMetaToTableProps(rawTable, schema)
-    val withNewSchema = rawTable.copy(properties = updatedProperties, schema = 
schema)
-    verifyColumnNames(withNewSchema)
+    val oldTable = getTable(db, table)
+    verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema)
+    val schemaProps =
+      tableMetaToTableProps(oldTable, StructType(newDataSchema ++ 
oldTable.partitionSchema)).toMap
 
-    if (isDatasourceTable(rawTable)) {
+    if (isDatasourceTable(oldTable)) {
       // For data source tables, first try to write it with the schema set; if 
that does not work,
       // try again with updated properties and the partition schema. This is a 
simplified version of
       // what createDataSourceTable() does, and may leave the table in a state 
unreadable by Hive
       // (for example, the schema does not match the data source schema, or 
does not match the
       // storage descriptor).
       try {
-        client.alterTable(withNewSchema)
+        client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
       } catch {
         case NonFatal(e) =>
           val warningMessage =
-            s"Could not alter schema of table  
${rawTable.identifier.quotedString} in a Hive " +
+            s"Could not alter schema of table 
${oldTable.identifier.quotedString} in a Hive " +
               "compatible way. Updating Hive metastore in Spark SQL specific 
format."
           logWarning(warningMessage, e)
-          client.alterTable(withNewSchema.copy(schema = 
rawTable.partitionSchema))
+          client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, 
schemaProps)
       }
     } else {
-      client.alterTable(withNewSchema)
+      client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
     }
   }
 
@@ -1297,6 +1299,15 @@ object HiveExternalCatalog {
 
   val CREATED_SPARK_VERSION = SPARK_SQL_PREFIX + "create.version"
 
+  // When storing data source tables in hive metastore, we need to set data 
schema to empty if the
+  // schema is hive-incompatible. However we need a hack to preserve existing 
behavior. Before
+  // Spark 2.0, we do not set a default serde here (this was done in Hive), 
and so if the user
+  // provides an empty schema Hive would automatically populate the schema 
with a single field
+  // "col". However, after SPARK-14388, we set the default serde to 
LazySimpleSerde so this
+  // implicit behavior no longer happens. Therefore, we need to do it in Spark 
ourselves.
+  val EMPTY_DATA_SCHEMA = new StructType()
+    .add("col", "array<string>", nullable = true, comment = "from 
deserializer")
+
   /**
    * Returns the fully qualified name used in table properties for a 
particular column stat.
    * For example, for column "mycol", and "min" stat, this should return

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 5ac6597..8adfda0 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -244,11 +244,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 
       inferredSchema match {
         case Some(dataSchema) =>
-          val schema = StructType(dataSchema ++ 
relation.tableMeta.partitionSchema)
           if (inferenceMode == INFER_AND_SAVE) {
-            updateCatalogSchema(relation.tableMeta.identifier, schema)
+            updateDataSchema(relation.tableMeta.identifier, dataSchema)
           }
-          relation.tableMeta.copy(schema = schema)
+          val newSchema = StructType(dataSchema ++ 
relation.tableMeta.partitionSchema)
+          relation.tableMeta.copy(schema = newSchema)
         case None =>
           logWarning(s"Unable to infer schema for table $tableName from file 
format " +
             s"$fileFormat (inference mode: $inferenceMode). Using metastore 
schema.")
@@ -259,10 +259,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     }
   }
 
-  private def updateCatalogSchema(identifier: TableIdentifier, schema: 
StructType): Unit = try {
-    val db = identifier.database.get
+  private def updateDataSchema(identifier: TableIdentifier, newDataSchema: 
StructType): Unit = try {
     logInfo(s"Saving case-sensitive schema for table 
${identifier.unquotedString}")
-    sparkSession.sharedState.externalCatalog.alterTableSchema(db, 
identifier.table, schema)
+    sparkSession.sessionState.catalog.alterTableDataSchema(identifier, 
newDataSchema)
   } catch {
     case NonFatal(ex) =>
       logWarning(s"Unable to save case-sensitive schema for table 
${identifier.unquotedString}", ex)

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3592b8f..ee1f6ee 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -152,11 +152,11 @@ object HiveAnalysis extends Rule[LogicalPlan] {
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, 
ifPartitionNotExists)
 
     case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) 
=>
-      DDLUtils.checkDataSchemaFieldNames(tableDesc)
+      DDLUtils.checkDataColNames(tableDesc)
       CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
 
     case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
-      DDLUtils.checkDataSchemaFieldNames(tableDesc)
+      DDLUtils.checkDataColNames(tableDesc)
       CreateHiveTableAsSelectCommand(tableDesc, query, mode)
 
     case InsertIntoDir(isLocal, storage, provider, child, overwrite)

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index ee3eb2e..f697174 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -100,6 +101,16 @@ private[hive] trait HiveClient {
    */
   def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit
 
+  /**
+   * Updates the given table with a new data schema and table properties, and 
keep everything else
+   * unchanged.
+   *
+   * TODO(cloud-fan): it's a little hacky to introduce the schema table 
properties here in
+   * `HiveClient`, but we don't have a cleaner solution now.
+   */
+  def alterTableDataSchema(
+    dbName: String, tableName: String, newDataSchema: StructType, schemaProps: 
Map[String, String])
+
   /** Creates a new database with the given name. */
   def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 16c95c5..b5a5890 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.AnalysisException
@@ -51,7 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, 
DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
 import org.apache.spark.sql.hive.client.HiveClientImpl._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -515,6 +514,33 @@ private[hive] class HiveClientImpl(
     shim.alterTable(client, qualifiedTableName, hiveTable)
   }
 
+  override def alterTableDataSchema(
+      dbName: String,
+      tableName: String,
+      newDataSchema: StructType,
+      schemaProps: Map[String, String]): Unit = withHiveState {
+    val oldTable = client.getTable(dbName, tableName)
+    val hiveCols = newDataSchema.map(toHiveColumn)
+    oldTable.setFields(hiveCols.asJava)
+
+    // remove old schema table properties
+    val it = oldTable.getParameters.entrySet.iterator
+    while (it.hasNext) {
+      val entry = it.next()
+      val isSchemaProp = 
entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) ||
+        entry.getKey == DATASOURCE_SCHEMA || entry.getKey == 
DATASOURCE_SCHEMA_NUMPARTS
+      if (isSchemaProp) {
+        it.remove()
+      }
+    }
+
+    // set new schema table properties
+    schemaProps.foreach { case (k, v) => oldTable.setProperty(k, v) }
+
+    val qualifiedTableName = s"$dbName.$tableName"
+    shim.alterTable(client, qualifiedTableName, oldTable)
+  }
+
   override def createPartitions(
       db: String,
       table: String,
@@ -896,20 +922,7 @@ private[hive] object HiveClientImpl {
     val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
       table.partitionColumnNames.contains(c.getName)
     }
-    // after SPARK-19279, it is not allowed to create a hive table with an 
empty schema,
-    // so here we should not add a default col schema
-    if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) {
-      // This is a hack to preserve existing behavior. Before Spark 2.0, we do 
not
-      // set a default serde here (this was done in Hive), and so if the user 
provides
-      // an empty schema Hive would automatically populate the schema with a 
single
-      // field "col". However, after SPARK-14388, we set the default serde to
-      // LazySimpleSerde so this implicit behavior no longer happens. 
Therefore,
-      // we need to do it in Spark ourselves.
-      hiveTable.setFields(
-        Seq(new FieldSchema("col", "array<string>", "from 
deserializer")).asJava)
-    } else {
-      hiveTable.setFields(schema.asJava)
-    }
+    hiveTable.setFields(schema.asJava)
     hiveTable.setPartCols(partCols.asJava)
     userName.foreach(hiveTable.setOwner)
     hiveTable.setCreateTime((table.createTime / 1000).toInt)

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index d43534d..2e35fde 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -89,4 +89,22 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
       assert(restoredTable.schema == newSchema)
     }
   }
+
+  test("SPARK-22306: alter table schema should not erase the bucketing 
metadata at hive side") {
+    val catalog = newBasicCatalog()
+    externalCatalog.client.runSqlHive(
+      """
+        |CREATE TABLE db1.t(a string, b string)
+        |CLUSTERED BY (a, b) SORTED BY (a, b) INTO 10 BUCKETS
+        |STORED AS PARQUET
+      """.stripMargin)
+
+    val newSchema = new StructType().add("a", "string").add("b", 
"string").add("c", "string")
+    catalog.alterTableDataSchema("db1", "t", newSchema)
+
+    assert(catalog.getTable("db1", "t").schema == newSchema)
+    val bucketString = externalCatalog.client.runSqlHive("DESC FORMATTED 
db1.t")
+      .filter(_.contains("Num Buckets")).head
+    assert(bucketString.contains("10"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index f5d41c9..a106047 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -741,7 +741,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       val hiveTable = CatalogTable(
         identifier = TableIdentifier(tableName, Some("default")),
         tableType = CatalogTableType.MANAGED,
-        schema = new StructType,
+        schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
         provider = Some("json"),
         storage = CatalogStorageFormat(
           locationUri = None,
@@ -1266,7 +1266,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       val hiveTable = CatalogTable(
         identifier = TableIdentifier("t", Some("default")),
         tableType = CatalogTableType.MANAGED,
-        schema = new StructType,
+        schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
         provider = Some("json"),
         storage = CatalogStorageFormat.empty,
         properties = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
index 5c248b9..bc82887 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
@@ -117,7 +117,7 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with 
TestHiveSingleton with Before
     spark.sql(createTableStmt)
     val oldTable = 
spark.sessionState.catalog.externalCatalog.getTable("default", tableName)
     catalog.createTable(oldTable, true)
-    catalog.alterTableSchema("default", tableName, updatedSchema)
+    catalog.alterTableDataSchema("default", tableName, updatedSchema)
 
     val updatedTable = catalog.getTable("default", tableName)
     assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to