Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c311c5e79 -> 4074ed2e1


[SPARK-22306][SQL][2.2] alter table schema should not erase the bucketing 
metadata at hive side

## What changes were proposed in this pull request?

When we alter table schema, we set the new schema to spark `CatalogTable`, 
convert it to hive table, and finally call `hive.alterTable`. This causes a 
problem in Spark 2.2, because hive bucketing metedata is not recognized by 
Spark, which means a Spark `CatalogTable` representing a hive table is always 
non-bucketed, and when we convert it to hive table and call `hive.alterTable`, 
the original hive bucketing metadata will be removed.

To fix this bug, we should read out the raw hive table metadata, update its 
schema, and call `hive.alterTable`. By doing this we can guarantee only the 
schema is changed, and nothing else.

Note that this bug doesn't exist in the master branch, because we've added hive 
bucketing support and the hive bucketing metadata can be recognized by Spark. I 
think we should merge this PR to master too, for code cleanup and reduce the 
difference between master and 2.2 branch for backporting.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenc...@databricks.com>

Closes #19622 from cloud-fan/infer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4074ed2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4074ed2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4074ed2e

Branch: refs/heads/branch-2.2
Commit: 4074ed2e1363c886878bbf9483e21abd1745f482
Parents: c311c5e
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Nov 2 12:37:52 2017 +0100
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Nov 2 12:37:52 2017 +0100

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  | 12 ++---
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  7 +--
 .../sql/catalyst/catalog/SessionCatalog.scala   | 25 ++++-----
 .../catalyst/catalog/ExternalCatalogSuite.scala | 11 ++--
 .../catalyst/catalog/SessionCatalogSuite.scala  | 21 ++++++--
 .../spark/sql/execution/command/tables.scala    | 10 +---
 .../spark/sql/hive/HiveExternalCatalog.scala    | 57 +++++++++++++-------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 11 ++--
 .../spark/sql/hive/client/HiveClient.scala      | 11 ++++
 .../spark/sql/hive/client/HiveClientImpl.scala  | 45 ++++++++++------
 .../sql/hive/HiveExternalCatalogSuite.scala     | 18 +++++++
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  4 +-
 .../sql/hive/execution/Hive_2_1_DDLSuite.scala  |  2 +-
 13 files changed, 148 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 18644b0..8db6f79 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -148,17 +148,15 @@ abstract class ExternalCatalog
   def alterTable(tableDefinition: CatalogTable): Unit
 
   /**
-   * Alter the schema of a table identified by the provided database and table 
name. The new schema
-   * should still contain the existing bucket columns and partition columns 
used by the table. This
-   * method will also update any Spark SQL-related parameters stored as Hive 
table properties (such
-   * as the schema itself).
+   * Alter the data schema of a table identified by the provided database and 
table name. The new
+   * data schema should not have conflict column names with the existing 
partition columns, and
+   * should still contain all the existing data columns.
    *
    * @param db Database that table to alter schema for exists in
    * @param table Name of table to alter schema for
-   * @param schema Updated schema to be used for the table (must contain 
existing partition and
-   *               bucket columns)
+   * @param newDataSchema Updated data schema to be used for the table.
    */
-  def alterTableSchema(db: String, table: String, schema: StructType): Unit
+  def alterTableDataSchema(db: String, table: String, newDataSchema: 
StructType): Unit
 
   def getTable(db: String, table: String): CatalogTable
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index bf8542c..f83e28f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -301,13 +301,14 @@ class InMemoryCatalog(
     catalog(db).tables(tableDefinition.identifier.table).table = 
tableDefinition
   }
 
-  override def alterTableSchema(
+  override def alterTableDataSchema(
       db: String,
       table: String,
-      schema: StructType): Unit = synchronized {
+      newDataSchema: StructType): Unit = synchronized {
     requireTableExists(db, table)
     val origTable = catalog(db).tables(table).table
-    catalog(db).tables(table).table = origTable.copy(schema = schema)
+    val newSchema = StructType(newDataSchema ++ origTable.partitionSchema)
+    catalog(db).tables(table).table = origTable.copy(schema = newSchema)
   }
 
   override def getTable(db: String, table: String): CatalogTable = 
synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index df8f9aa..bbcfdac 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -336,30 +336,28 @@ class SessionCatalog(
   }
 
   /**
-   * Alter the schema of a table identified by the provided table identifier. 
The new schema
-   * should still contain the existing bucket columns and partition columns 
used by the table. This
-   * method will also update any Spark SQL-related parameters stored as Hive 
table properties (such
-   * as the schema itself).
+   * Alter the data schema of a table identified by the provided table 
identifier. The new data
+   * schema should not have conflict column names with the existing partition 
columns, and should
+   * still contain all the existing data columns.
    *
    * @param identifier TableIdentifier
-   * @param newSchema Updated schema to be used for the table (must contain 
existing partition and
-   *                  bucket columns, and partition columns need to be at the 
end)
+   * @param newDataSchema Updated data schema to be used for the table
    */
-  def alterTableSchema(
+  def alterTableDataSchema(
       identifier: TableIdentifier,
-      newSchema: StructType): Unit = {
+      newDataSchema: StructType): Unit = {
     val db = 
formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(identifier.table)
     val tableIdentifier = TableIdentifier(table, Some(db))
     requireDbExists(db)
     requireTableExists(tableIdentifier)
-    checkDuplication(newSchema)
 
     val catalogTable = externalCatalog.getTable(db, table)
-    val oldSchema = catalogTable.schema
-
+    checkDuplication(newDataSchema ++ catalogTable.partitionSchema)
+    val oldDataSchema = catalogTable.dataSchema
     // not supporting dropping columns yet
-    val nonExistentColumnNames = 
oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
+    val nonExistentColumnNames =
+      oldDataSchema.map(_.name).filterNot(columnNameResolved(newDataSchema, _))
     if (nonExistentColumnNames.nonEmpty) {
       throw new AnalysisException(
         s"""
@@ -368,8 +366,7 @@ class SessionCatalog(
          """.stripMargin)
     }
 
-    // assuming the newSchema has all partition columns at the end as required
-    externalCatalog.alterTableSchema(db, table, newSchema)
+    externalCatalog.alterTableDataSchema(db, table, newDataSchema)
   }
 
   private def columnNameResolved(schema: StructType, colName: String): Boolean 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 54ecf44..014d0c0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -245,15 +245,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
 
   test("alter table schema") {
     val catalog = newBasicCatalog()
-    val tbl1 = catalog.getTable("db2", "tbl1")
-    val newSchema = StructType(Seq(
+    val newDataSchema = StructType(Seq(
       StructField("new_field_1", IntegerType),
-      StructField("new_field_2", StringType),
-      StructField("a", IntegerType),
-      StructField("b", StringType)))
-    catalog.alterTableSchema("db2", "tbl1", newSchema)
+      StructField("new_field_2", StringType)))
+    catalog.alterTableDataSchema("db2", "tbl1", newDataSchema)
     val newTbl1 = catalog.getTable("db2", "tbl1")
-    assert(newTbl1.schema == newSchema)
+    assert(newTbl1.dataSchema == newDataSchema)
   }
 
   test("get table") {

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 9c1b638..a8c6c06 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -452,9 +452,9 @@ abstract class SessionCatalogSuite extends PlanTest {
     withBasicCatalog { sessionCatalog =>
       sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = 
false)
       val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
-      sessionCatalog.alterTableSchema(
+      sessionCatalog.alterTableDataSchema(
         TableIdentifier("t1", Some("default")),
-        StructType(oldTab.dataSchema.add("c3", IntegerType) ++ 
oldTab.partitionSchema))
+        StructType(oldTab.dataSchema.add("c3", IntegerType)))
 
       val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
       // construct the expected table schema
@@ -464,13 +464,26 @@ abstract class SessionCatalogSuite extends PlanTest {
     }
   }
 
+  test("alter table add columns which are conflicting with partition columns") 
{
+    withBasicCatalog { sessionCatalog =>
+      sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = 
false)
+      val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
+      val e = intercept[AnalysisException] {
+        sessionCatalog.alterTableDataSchema(
+          TableIdentifier("t1", Some("default")),
+          StructType(oldTab.dataSchema.add("a", IntegerType)))
+      }.getMessage
+      assert(e.contains("Found duplicate column(s): a"))
+    }
+  }
+
   test("alter table drop columns") {
     withBasicCatalog { sessionCatalog =>
       sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = 
false)
       val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
       val e = intercept[AnalysisException] {
-        sessionCatalog.alterTableSchema(
-          TableIdentifier("t1", Some("default")), 
StructType(oldTab.schema.drop(1)))
+        sessionCatalog.alterTableDataSchema(
+          TableIdentifier("t1", Some("default")), 
StructType(oldTab.dataSchema.drop(1)))
       }.getMessage
       assert(e.contains("We don't support dropping columns yet."))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 6348638..8b61240 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -187,11 +187,10 @@ case class AlterTableRenameCommand(
 */
 case class AlterTableAddColumnsCommand(
     table: TableIdentifier,
-    columns: Seq[StructField]) extends RunnableCommand {
+    colsToAdd: Seq[StructField]) extends RunnableCommand {
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val catalogTable = verifyAlterTableAddColumn(catalog, table)
-
     try {
       sparkSession.catalog.uncacheTable(table.quotedString)
     } catch {
@@ -199,12 +198,7 @@ case class AlterTableAddColumnsCommand(
         log.warn(s"Exception when attempting to uncache table 
${table.quotedString}", e)
     }
     catalog.refreshTable(table)
-
-    // make sure any partition columns are at the end of the fields
-    val reorderedSchema = catalogTable.dataSchema ++ columns ++ 
catalogTable.partitionSchema
-    catalog.alterTableSchema(
-      table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
-
+    catalog.alterTableDataSchema(table, StructType(catalogTable.dataSchema ++ 
colsToAdd))
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2ea4e15..1c26d98 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -138,16 +138,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   }
 
   /**
-   * Checks the validity of column names. Hive metastore disallows the table 
to use comma in
+   * Checks the validity of data column names. Hive metastore disallows the 
table to use comma in
    * data column names. Partition columns do not have such a restriction. 
Views do not have such
    * a restriction.
    */
-  private def verifyColumnNames(table: CatalogTable): Unit = {
-    if (table.tableType != VIEW) {
-      table.dataSchema.map(_.name).foreach { colName =>
+  private def verifyDataSchema(
+      tableName: TableIdentifier, tableType: CatalogTableType, dataSchema: 
StructType): Unit = {
+    if (tableType != VIEW) {
+      dataSchema.map(_.name).foreach { colName =>
         if (colName.contains(",")) {
           throw new AnalysisException("Cannot create a table having a column 
whose name contains " +
-            s"commas in Hive metastore. Table: ${table.identifier}; Column: 
$colName")
+            s"commas in Hive metastore. Table: $tableName; Column: $colName")
         }
       }
     }
@@ -218,7 +219,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val table = tableDefinition.identifier.table
     requireDbExists(db)
     verifyTableProperties(tableDefinition)
-    verifyColumnNames(tableDefinition)
+    verifyDataSchema(
+      tableDefinition.identifier, tableDefinition.tableType, 
tableDefinition.dataSchema)
 
     if (tableExists(db, table) && !ignoreIfExists) {
       throw new TableAlreadyExistsException(db = db, table = table)
@@ -295,7 +297,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         storage = table.storage.copy(
           locationUri = None,
           properties = storagePropsWithLocation),
-        schema = table.partitionSchema,
+        schema = StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema),
         bucketSpec = None,
         properties = table.properties ++ tableProperties)
     }
@@ -312,6 +314,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         None
       }
 
+      // TODO: empty data schema is not hive compatible, we only do it to keep 
behavior as it was
+      // because previously we generate the special empty schema in 
`HiveClient`. Remove this in
+      // Spark 2.3.
+      val schema = if (table.dataSchema.isEmpty) {
+        StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema)
+      } else {
+        table.schema
+      }
+
       table.copy(
         storage = table.storage.copy(
           locationUri = location,
@@ -320,6 +331,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
           serde = serde.serde,
           properties = storagePropsWithLocation
         ),
+        schema = schema,
         properties = table.properties ++ tableProperties)
     }
 
@@ -630,32 +642,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     }
   }
 
-  override def alterTableSchema(db: String, table: String, schema: 
StructType): Unit = withClient {
+  override def alterTableDataSchema(
+      db: String, table: String, newDataSchema: StructType): Unit = withClient 
{
     requireTableExists(db, table)
-    val rawTable = getRawTable(db, table)
-    // Add table metadata such as table schema, partition columns, etc. to 
table properties.
-    val updatedProperties = rawTable.properties ++ 
tableMetaToTableProps(rawTable, schema)
-    val withNewSchema = rawTable.copy(properties = updatedProperties, schema = 
schema)
-    verifyColumnNames(withNewSchema)
+    val oldTable = getTable(db, table)
+    verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema)
+    val schemaProps =
+      tableMetaToTableProps(oldTable, StructType(newDataSchema ++ 
oldTable.partitionSchema)).toMap
 
-    if (isDatasourceTable(rawTable)) {
+    if (isDatasourceTable(oldTable)) {
       // For data source tables, first try to write it with the schema set; if 
that does not work,
       // try again with updated properties and the partition schema. This is a 
simplified version of
       // what createDataSourceTable() does, and may leave the table in a state 
unreadable by Hive
       // (for example, the schema does not match the data source schema, or 
does not match the
       // storage descriptor).
       try {
-        client.alterTable(withNewSchema)
+        client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
       } catch {
         case NonFatal(e) =>
           val warningMessage =
-            s"Could not alter schema of table  
${rawTable.identifier.quotedString} in a Hive " +
+            s"Could not alter schema of table 
${oldTable.identifier.quotedString} in a Hive " +
               "compatible way. Updating Hive metastore in Spark SQL specific 
format."
           logWarning(warningMessage, e)
-          client.alterTable(withNewSchema.copy(schema = 
rawTable.partitionSchema))
+          client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, 
schemaProps)
       }
     } else {
-      client.alterTable(withNewSchema)
+      client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
     }
   }
 
@@ -1191,6 +1203,15 @@ object HiveExternalCatalog {
   val TABLE_PARTITION_PROVIDER_CATALOG = "catalog"
   val TABLE_PARTITION_PROVIDER_FILESYSTEM = "filesystem"
 
+  // When storing data source tables in hive metastore, we need to set data 
schema to empty if the
+  // schema is hive-incompatible. However we need a hack to preserve existing 
behavior. Before
+  // Spark 2.0, we do not set a default serde here (this was done in Hive), 
and so if the user
+  // provides an empty schema Hive would automatically populate the schema 
with a single field
+  // "col". However, after SPARK-14388, we set the default serde to 
LazySimpleSerde so this
+  // implicit behavior no longer happens. Therefore, we need to do it in Spark 
ourselves.
+  val EMPTY_DATA_SCHEMA = new StructType()
+    .add("col", "array<string>", nullable = true, comment = "from 
deserializer")
+
   /**
    * Returns the fully qualified name used in table properties for a 
particular column stat.
    * For example, for column "mycol", and "min" stat, this should return

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f23b27c..f858dd9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -246,11 +246,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 
       inferredSchema match {
         case Some(dataSchema) =>
-          val schema = StructType(dataSchema ++ 
relation.tableMeta.partitionSchema)
           if (inferenceMode == INFER_AND_SAVE) {
-            updateCatalogSchema(relation.tableMeta.identifier, schema)
+            updateDataSchema(relation.tableMeta.identifier, dataSchema)
           }
-          relation.tableMeta.copy(schema = schema)
+          val newSchema = StructType(dataSchema ++ 
relation.tableMeta.partitionSchema)
+          relation.tableMeta.copy(schema = newSchema)
         case None =>
           logWarning(s"Unable to infer schema for table $tableName from file 
format " +
             s"$fileFormat (inference mode: $inferenceMode). Using metastore 
schema.")
@@ -261,10 +261,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     }
   }
 
-  private def updateCatalogSchema(identifier: TableIdentifier, schema: 
StructType): Unit = try {
-    val db = identifier.database.get
+  private def updateDataSchema(identifier: TableIdentifier, newDataSchema: 
StructType): Unit = try {
     logInfo(s"Saving case-sensitive schema for table 
${identifier.unquotedString}")
-    sparkSession.sharedState.externalCatalog.alterTableSchema(db, 
identifier.table, schema)
+    sparkSession.sessionState.catalog.alterTableDataSchema(identifier, 
newDataSchema)
   } catch {
     case NonFatal(ex) =>
       logWarning(s"Unable to save case-sensitive schema for table 
${identifier.unquotedString}", ex)

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 16a80f9..492a2ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -89,6 +90,16 @@ private[hive] trait HiveClient {
   /** Updates the given table with new metadata, optionally renaming the 
table. */
   def alterTable(tableName: String, table: CatalogTable): Unit
 
+  /**
+   * Updates the given table with a new data schema and table properties, and 
keep everything else
+   * unchanged.
+   *
+   * TODO(cloud-fan): it's a little hacky to introduce the schema table 
properties here in
+   * `HiveClient`, but we don't have a cleaner solution now.
+   */
+  def alterTableDataSchema(
+    dbName: String, tableName: String, newDataSchema: StructType, schemaProps: 
Map[String, String])
+
   /** Creates a new database with the given name. */
   def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 2cf11f4..541797d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,8 +46,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, 
DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
 import org.apache.spark.sql.hive.client.HiveClientImpl._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -462,6 +461,33 @@ private[hive] class HiveClientImpl(
     shim.alterTable(client, qualifiedTableName, hiveTable)
   }
 
+  override def alterTableDataSchema(
+      dbName: String,
+      tableName: String,
+      newDataSchema: StructType,
+      schemaProps: Map[String, String]): Unit = withHiveState {
+    val oldTable = client.getTable(dbName, tableName)
+    val hiveCols = newDataSchema.map(toHiveColumn)
+    oldTable.setFields(hiveCols.asJava)
+
+    // remove old schema table properties
+    val it = oldTable.getParameters.entrySet.iterator
+    while (it.hasNext) {
+      val entry = it.next()
+      val isSchemaProp = 
entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) ||
+        entry.getKey == DATASOURCE_SCHEMA || entry.getKey == 
DATASOURCE_SCHEMA_NUMPARTS
+      if (isSchemaProp) {
+        it.remove()
+      }
+    }
+
+    // set new schema table properties
+    schemaProps.foreach { case (k, v) => oldTable.setProperty(k, v) }
+
+    val qualifiedTableName = s"$dbName.$tableName"
+    shim.alterTable(client, qualifiedTableName, oldTable)
+  }
+
   override def createPartitions(
       db: String,
       table: String,
@@ -837,20 +863,7 @@ private[hive] object HiveClientImpl {
     val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
       table.partitionColumnNames.contains(c.getName)
     }
-    // after SPARK-19279, it is not allowed to create a hive table with an 
empty schema,
-    // so here we should not add a default col schema
-    if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) {
-      // This is a hack to preserve existing behavior. Before Spark 2.0, we do 
not
-      // set a default serde here (this was done in Hive), and so if the user 
provides
-      // an empty schema Hive would automatically populate the schema with a 
single
-      // field "col". However, after SPARK-14388, we set the default serde to
-      // LazySimpleSerde so this implicit behavior no longer happens. 
Therefore,
-      // we need to do it in Spark ourselves.
-      hiveTable.setFields(
-        Seq(new FieldSchema("col", "array<string>", "from 
deserializer")).asJava)
-    } else {
-      hiveTable.setFields(schema.asJava)
-    }
+    hiveTable.setFields(schema.asJava)
     hiveTable.setPartCols(partCols.asJava)
     userName.foreach(hiveTable.setOwner)
     hiveTable.setCreateTime((table.createTime / 1000).toInt)

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index d43534d..2e35fde 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -89,4 +89,22 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
       assert(restoredTable.schema == newSchema)
     }
   }
+
+  test("SPARK-22306: alter table schema should not erase the bucketing 
metadata at hive side") {
+    val catalog = newBasicCatalog()
+    externalCatalog.client.runSqlHive(
+      """
+        |CREATE TABLE db1.t(a string, b string)
+        |CLUSTERED BY (a, b) SORTED BY (a, b) INTO 10 BUCKETS
+        |STORED AS PARQUET
+      """.stripMargin)
+
+    val newSchema = new StructType().add("a", "string").add("b", 
"string").add("c", "string")
+    catalog.alterTableDataSchema("db1", "t", newSchema)
+
+    assert(catalog.getTable("db1", "t").schema == newSchema)
+    val bucketString = externalCatalog.client.runSqlHive("DESC FORMATTED 
db1.t")
+      .filter(_.contains("Num Buckets")).head
+    assert(bucketString.contains("10"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 32e97eb..c0acffb 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -746,7 +746,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       val hiveTable = CatalogTable(
         identifier = TableIdentifier(tableName, Some("default")),
         tableType = CatalogTableType.MANAGED,
-        schema = new StructType,
+        schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
         provider = Some("json"),
         storage = CatalogStorageFormat(
           locationUri = None,
@@ -1271,7 +1271,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       val hiveTable = CatalogTable(
         identifier = TableIdentifier("t", Some("default")),
         tableType = CatalogTableType.MANAGED,
-        schema = new StructType,
+        schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA,
         provider = Some("json"),
         storage = CatalogStorageFormat.empty,
         properties = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
index 5c248b9..bc82887 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
@@ -117,7 +117,7 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with 
TestHiveSingleton with Before
     spark.sql(createTableStmt)
     val oldTable = 
spark.sessionState.catalog.externalCatalog.getTable("default", tableName)
     catalog.createTable(oldTable, true)
-    catalog.alterTableSchema("default", tableName, updatedSchema)
+    catalog.alterTableDataSchema("default", tableName, updatedSchema)
 
     val updatedTable = catalog.getTable("default", tableName)
     assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to