Repository: spark Updated Branches: refs/heads/master 882079f5c -> 2fd12af43
[SPARK-22306][SQL] alter table schema should not erase the bucketing metadata at hive side forward-port https://github.com/apache/spark/pull/19622 to master branch. This bug doesn't exist in master because we've added hive bucketing support and the hive bucketing metadata can be recognized by Spark, but we should still port it to master: 1) there may be other unsupported hive metadata removed by Spark. 2) reduce code difference between master and 2.2 to ease the backport in the feature. *** When we alter table schema, we set the new schema to spark `CatalogTable`, convert it to hive table, and finally call `hive.alterTable`. This causes a problem in Spark 2.2, because hive bucketing metedata is not recognized by Spark, which means a Spark `CatalogTable` representing a hive table is always non-bucketed, and when we convert it to hive table and call `hive.alterTable`, the original hive bucketing metadata will be removed. To fix this bug, we should read out the raw hive table metadata, update its schema, and call `hive.alterTable`. By doing this we can guarantee only the schema is changed, and nothing else. Author: Wenchen Fan <wenc...@databricks.com> Closes #19644 from cloud-fan/infer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fd12af4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fd12af4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fd12af4 Branch: refs/heads/master Commit: 2fd12af4372a1e2c3faf0eb5d0a1cf530abc0016 Parents: 882079f Author: Wenchen Fan <wenc...@databricks.com> Authored: Thu Nov 2 23:41:16 2017 +0100 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Nov 2 23:41:16 2017 +0100 ---------------------------------------------------------------------- .../sql/catalyst/catalog/ExternalCatalog.scala | 12 +++-- .../sql/catalyst/catalog/InMemoryCatalog.scala | 7 +-- .../sql/catalyst/catalog/SessionCatalog.scala | 23 +++++----- .../catalyst/catalog/ExternalCatalogSuite.scala | 10 ++--- .../catalyst/catalog/SessionCatalogSuite.scala | 8 ++-- .../spark/sql/execution/command/ddl.scala | 14 +++--- .../spark/sql/execution/command/tables.scala | 14 +++--- .../datasources/DataSourceStrategy.scala | 4 +- .../datasources/orc/OrcFileFormat.scala | 5 +-- .../datasources/parquet/ParquetFileFormat.scala | 5 +-- .../parquet/ParquetSchemaConverter.scala | 5 +-- .../spark/sql/hive/HiveExternalCatalog.scala | 47 ++++++++++++-------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 11 +++-- .../apache/spark/sql/hive/HiveStrategies.scala | 4 +- .../spark/sql/hive/client/HiveClient.scala | 11 +++++ .../spark/sql/hive/client/HiveClientImpl.scala | 45 ++++++++++++------- .../sql/hive/HiveExternalCatalogSuite.scala | 18 ++++++++ .../sql/hive/MetastoreDataSourcesSuite.scala | 4 +- .../sql/hive/execution/Hive_2_1_DDLSuite.scala | 2 +- 19 files changed, 146 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index d4c58db..223094d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -150,17 +150,15 @@ abstract class ExternalCatalog def alterTable(tableDefinition: CatalogTable): Unit /** - * Alter the schema of a table identified by the provided database and table name. The new schema - * should still contain the existing bucket columns and partition columns used by the table. This - * method will also update any Spark SQL-related parameters stored as Hive table properties (such - * as the schema itself). + * Alter the data schema of a table identified by the provided database and table name. The new + * data schema should not have conflict column names with the existing partition columns, and + * should still contain all the existing data columns. * * @param db Database that table to alter schema for exists in * @param table Name of table to alter schema for - * @param schema Updated schema to be used for the table (must contain existing partition and - * bucket columns) + * @param newDataSchema Updated data schema to be used for the table. */ - def alterTableSchema(db: String, table: String, schema: StructType): Unit + def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */ def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 98370c1..9504140 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -303,13 +303,14 @@ class InMemoryCatalog( catalog(db).tables(tableDefinition.identifier.table).table = newTableDefinition } - override def alterTableSchema( + override def alterTableDataSchema( db: String, table: String, - schema: StructType): Unit = synchronized { + newDataSchema: StructType): Unit = synchronized { requireTableExists(db, table) val origTable = catalog(db).tables(table).table - catalog(db).tables(table).table = origTable.copy(schema = schema) + val newSchema = StructType(newDataSchema ++ origTable.partitionSchema) + catalog(db).tables(table).table = origTable.copy(schema = newSchema) } override def alterTableStats( http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 95bc3d6..a129896 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -324,18 +324,16 @@ class SessionCatalog( } /** - * Alter the schema of a table identified by the provided table identifier. The new schema - * should still contain the existing bucket columns and partition columns used by the table. This - * method will also update any Spark SQL-related parameters stored as Hive table properties (such - * as the schema itself). + * Alter the data schema of a table identified by the provided table identifier. The new data + * schema should not have conflict column names with the existing partition columns, and should + * still contain all the existing data columns. * * @param identifier TableIdentifier - * @param newSchema Updated schema to be used for the table (must contain existing partition and - * bucket columns, and partition columns need to be at the end) + * @param newDataSchema Updated data schema to be used for the table */ - def alterTableSchema( + def alterTableDataSchema( identifier: TableIdentifier, - newSchema: StructType): Unit = { + newDataSchema: StructType): Unit = { val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(identifier.table) val tableIdentifier = TableIdentifier(table, Some(db)) @@ -343,10 +341,10 @@ class SessionCatalog( requireTableExists(tableIdentifier) val catalogTable = externalCatalog.getTable(db, table) - val oldSchema = catalogTable.schema - + val oldDataSchema = catalogTable.dataSchema // not supporting dropping columns yet - val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _)) + val nonExistentColumnNames = + oldDataSchema.map(_.name).filterNot(columnNameResolved(newDataSchema, _)) if (nonExistentColumnNames.nonEmpty) { throw new AnalysisException( s""" @@ -355,8 +353,7 @@ class SessionCatalog( """.stripMargin) } - // assuming the newSchema has all partition columns at the end as required - externalCatalog.alterTableSchema(db, table, newSchema) + externalCatalog.alterTableDataSchema(db, table, newDataSchema) } private def columnNameResolved(schema: StructType, colName: String): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 94593ef..b376108 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -245,14 +245,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac test("alter table schema") { val catalog = newBasicCatalog() - val newSchema = StructType(Seq( + val newDataSchema = StructType(Seq( StructField("col1", IntegerType), - StructField("new_field_2", StringType), - StructField("a", IntegerType), - StructField("b", StringType))) - catalog.alterTableSchema("db2", "tbl1", newSchema) + StructField("new_field_2", StringType))) + catalog.alterTableDataSchema("db2", "tbl1", newDataSchema) val newTbl1 = catalog.getTable("db2", "tbl1") - assert(newTbl1.schema == newSchema) + assert(newTbl1.dataSchema == newDataSchema) } test("alter table stats") { http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 1cce199..95c87ff 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -463,9 +463,9 @@ abstract class SessionCatalogSuite extends AnalysisTest { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") - sessionCatalog.alterTableSchema( + sessionCatalog.alterTableDataSchema( TableIdentifier("t1", Some("default")), - StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema)) + StructType(oldTab.dataSchema.add("c3", IntegerType))) val newTab = sessionCatalog.externalCatalog.getTable("default", "t1") // construct the expected table schema @@ -480,8 +480,8 @@ abstract class SessionCatalogSuite extends AnalysisTest { sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") val e = intercept[AnalysisException] { - sessionCatalog.alterTableSchema( - TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1))) + sessionCatalog.alterTableDataSchema( + TableIdentifier("t1", Some("default")), StructType(oldTab.dataSchema.drop(1))) }.getMessage assert(e.contains("We don't support dropping columns yet.")) } http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 162e1d5..a9cd65e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -857,19 +857,23 @@ object DDLUtils { } } - private[sql] def checkDataSchemaFieldNames(table: CatalogTable): Unit = { + private[sql] def checkDataColNames(table: CatalogTable): Unit = { + checkDataColNames(table, table.dataSchema.fieldNames) + } + + private[sql] def checkDataColNames(table: CatalogTable, colNames: Seq[String]): Unit = { table.provider.foreach { _.toLowerCase(Locale.ROOT) match { case HIVE_PROVIDER => val serde = table.storage.serde if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { - OrcFileFormat.checkFieldNames(table.dataSchema) + OrcFileFormat.checkFieldNames(colNames) } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde || serde == Some("parquet.hive.serde.ParquetHiveSerDe")) { - ParquetSchemaConverter.checkFieldNames(table.dataSchema) + ParquetSchemaConverter.checkFieldNames(colNames) } - case "parquet" => ParquetSchemaConverter.checkFieldNames(table.dataSchema) - case "orc" => OrcFileFormat.checkFieldNames(table.dataSchema) + case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames) + case "orc" => OrcFileFormat.checkFieldNames(colNames) case _ => } } http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 38f9163..95f16b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -186,7 +186,7 @@ case class AlterTableRenameCommand( */ case class AlterTableAddColumnsCommand( table: TableIdentifier, - columns: Seq[StructField]) extends RunnableCommand { + colsToAdd: Seq[StructField]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val catalogTable = verifyAlterTableAddColumn(catalog, table) @@ -199,17 +199,13 @@ case class AlterTableAddColumnsCommand( } catalog.refreshTable(table) - // make sure any partition columns are at the end of the fields - val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema - val newSchema = catalogTable.schema.copy(fields = reorderedSchema.toArray) - SchemaUtils.checkColumnNameDuplication( - reorderedSchema.map(_.name), "in the table definition of " + table.identifier, + (colsToAdd ++ catalogTable.schema).map(_.name), + "in the table definition of " + table.identifier, conf.caseSensitiveAnalysis) - DDLUtils.checkDataSchemaFieldNames(catalogTable.copy(schema = newSchema)) - - catalog.alterTableSchema(table, newSchema) + DDLUtils.checkDataColNames(catalogTable, colsToAdd.map(_.name)) + catalog.alterTableDataSchema(table, StructType(catalogTable.dataSchema ++ colsToAdd)) Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 018f24e..04d6f3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -133,12 +133,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => - DDLUtils.checkDataSchemaFieldNames(tableDesc) + DDLUtils.checkDataColNames(tableDesc) CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => - DDLUtils.checkDataSchemaFieldNames(tableDesc.copy(schema = query.schema)) + DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema)) CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _), http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 2eeb006..215740e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -35,8 +35,7 @@ private[sql] object OrcFileFormat { } } - def checkFieldNames(schema: StructType): StructType = { - schema.fieldNames.foreach(checkFieldName) - schema + def checkFieldNames(names: Seq[String]): Unit = { + names.foreach(checkFieldName) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c1535ba..61bd65d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -23,7 +23,6 @@ import java.net.URI import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Try} import org.apache.hadoop.conf.Configuration @@ -306,10 +305,10 @@ class ParquetFileFormat hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) hadoopConf.set( ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - ParquetSchemaConverter.checkFieldNames(requiredSchema).json) + requiredSchema.json) hadoopConf.set( ParquetWriteSupport.SPARK_ROW_SCHEMA, - ParquetSchemaConverter.checkFieldNames(requiredSchema).json) + requiredSchema.json) ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index b3781cf..cd384d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -571,9 +571,8 @@ private[sql] object ParquetSchemaConverter { """.stripMargin.split("\n").mkString(" ").trim) } - def checkFieldNames(schema: StructType): StructType = { - schema.fieldNames.foreach(checkFieldName) - schema + def checkFieldNames(names: Seq[String]): Unit = { + names.foreach(checkFieldName) } def checkConversionRequirement(f: => Boolean, message: String): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 96dc983..f8a947b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -138,16 +138,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } /** - * Checks the validity of column names. Hive metastore disallows the table to use comma in + * Checks the validity of data column names. Hive metastore disallows the table to use comma in * data column names. Partition columns do not have such a restriction. Views do not have such * a restriction. */ - private def verifyColumnNames(table: CatalogTable): Unit = { - if (table.tableType != VIEW) { - table.dataSchema.map(_.name).foreach { colName => + private def verifyDataSchema( + tableName: TableIdentifier, tableType: CatalogTableType, dataSchema: StructType): Unit = { + if (tableType != VIEW) { + dataSchema.map(_.name).foreach { colName => if (colName.contains(",")) { throw new AnalysisException("Cannot create a table having a column whose name contains " + - s"commas in Hive metastore. Table: ${table.identifier}; Column: $colName") + s"commas in Hive metastore. Table: $tableName; Column: $colName") } } } @@ -218,7 +219,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val table = tableDefinition.identifier.table requireDbExists(db) verifyTableProperties(tableDefinition) - verifyColumnNames(tableDefinition) + verifyDataSchema( + tableDefinition.identifier, tableDefinition.tableType, tableDefinition.dataSchema) if (tableExists(db, table) && !ignoreIfExists) { throw new TableAlreadyExistsException(db = db, table = table) @@ -296,7 +298,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storage = table.storage.copy( locationUri = None, properties = storagePropsWithLocation), - schema = table.partitionSchema, + schema = StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema), bucketSpec = None, properties = table.properties ++ tableProperties) } @@ -617,32 +619,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient { + override def alterTableDataSchema( + db: String, table: String, newDataSchema: StructType): Unit = withClient { requireTableExists(db, table) - val rawTable = getRawTable(db, table) - // Add table metadata such as table schema, partition columns, etc. to table properties. - val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema) - val withNewSchema = rawTable.copy(properties = updatedProperties, schema = schema) - verifyColumnNames(withNewSchema) + val oldTable = getTable(db, table) + verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema) + val schemaProps = + tableMetaToTableProps(oldTable, StructType(newDataSchema ++ oldTable.partitionSchema)).toMap - if (isDatasourceTable(rawTable)) { + if (isDatasourceTable(oldTable)) { // For data source tables, first try to write it with the schema set; if that does not work, // try again with updated properties and the partition schema. This is a simplified version of // what createDataSourceTable() does, and may leave the table in a state unreadable by Hive // (for example, the schema does not match the data source schema, or does not match the // storage descriptor). try { - client.alterTable(withNewSchema) + client.alterTableDataSchema(db, table, newDataSchema, schemaProps) } catch { case NonFatal(e) => val warningMessage = - s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + + s"Could not alter schema of table ${oldTable.identifier.quotedString} in a Hive " + "compatible way. Updating Hive metastore in Spark SQL specific format." logWarning(warningMessage, e) - client.alterTable(withNewSchema.copy(schema = rawTable.partitionSchema)) + client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps) } } else { - client.alterTable(withNewSchema) + client.alterTableDataSchema(db, table, newDataSchema, schemaProps) } } @@ -1297,6 +1299,15 @@ object HiveExternalCatalog { val CREATED_SPARK_VERSION = SPARK_SQL_PREFIX + "create.version" + // When storing data source tables in hive metastore, we need to set data schema to empty if the + // schema is hive-incompatible. However we need a hack to preserve existing behavior. Before + // Spark 2.0, we do not set a default serde here (this was done in Hive), and so if the user + // provides an empty schema Hive would automatically populate the schema with a single field + // "col". However, after SPARK-14388, we set the default serde to LazySimpleSerde so this + // implicit behavior no longer happens. Therefore, we need to do it in Spark ourselves. + val EMPTY_DATA_SCHEMA = new StructType() + .add("col", "array<string>", nullable = true, comment = "from deserializer") + /** * Returns the fully qualified name used in table properties for a particular column stat. * For example, for column "mycol", and "min" stat, this should return http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5ac6597..8adfda0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -244,11 +244,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log inferredSchema match { case Some(dataSchema) => - val schema = StructType(dataSchema ++ relation.tableMeta.partitionSchema) if (inferenceMode == INFER_AND_SAVE) { - updateCatalogSchema(relation.tableMeta.identifier, schema) + updateDataSchema(relation.tableMeta.identifier, dataSchema) } - relation.tableMeta.copy(schema = schema) + val newSchema = StructType(dataSchema ++ relation.tableMeta.partitionSchema) + relation.tableMeta.copy(schema = newSchema) case None => logWarning(s"Unable to infer schema for table $tableName from file format " + s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.") @@ -259,10 +259,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - private def updateCatalogSchema(identifier: TableIdentifier, schema: StructType): Unit = try { - val db = identifier.database.get + private def updateDataSchema(identifier: TableIdentifier, newDataSchema: StructType): Unit = try { logInfo(s"Saving case-sensitive schema for table ${identifier.unquotedString}") - sparkSession.sharedState.externalCatalog.alterTableSchema(db, identifier.table, schema) + sparkSession.sessionState.catalog.alterTableDataSchema(identifier, newDataSchema) } catch { case NonFatal(ex) => logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex) http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 3592b8f..ee1f6ee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -152,11 +152,11 @@ object HiveAnalysis extends Rule[LogicalPlan] { InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => - DDLUtils.checkDataSchemaFieldNames(tableDesc) + DDLUtils.checkDataColNames(tableDesc) CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => - DDLUtils.checkDataSchemaFieldNames(tableDesc) + DDLUtils.checkDataColNames(tableDesc) CreateHiveTableAsSelectCommand(tableDesc, query, mode) case InsertIntoDir(isLocal, storage, provider, child, overwrite) http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index ee3eb2e..f697174 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.types.StructType /** @@ -100,6 +101,16 @@ private[hive] trait HiveClient { */ def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit + /** + * Updates the given table with a new data schema and table properties, and keep everything else + * unchanged. + * + * TODO(cloud-fan): it's a little hacky to introduce the schema table properties here in + * `HiveClient`, but we don't have a cleaner solution now. + */ + def alterTableDataSchema( + dbName: String, tableName: String, newDataSchema: StructType, schemaProps: Map[String, String]) + /** Creates a new database with the given name. */ def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 16c95c5..b5a5890 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException @@ -51,7 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -515,6 +514,33 @@ private[hive] class HiveClientImpl( shim.alterTable(client, qualifiedTableName, hiveTable) } + override def alterTableDataSchema( + dbName: String, + tableName: String, + newDataSchema: StructType, + schemaProps: Map[String, String]): Unit = withHiveState { + val oldTable = client.getTable(dbName, tableName) + val hiveCols = newDataSchema.map(toHiveColumn) + oldTable.setFields(hiveCols.asJava) + + // remove old schema table properties + val it = oldTable.getParameters.entrySet.iterator + while (it.hasNext) { + val entry = it.next() + val isSchemaProp = entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) || + entry.getKey == DATASOURCE_SCHEMA || entry.getKey == DATASOURCE_SCHEMA_NUMPARTS + if (isSchemaProp) { + it.remove() + } + } + + // set new schema table properties + schemaProps.foreach { case (k, v) => oldTable.setProperty(k, v) } + + val qualifiedTableName = s"$dbName.$tableName" + shim.alterTable(client, qualifiedTableName, oldTable) + } + override def createPartitions( db: String, table: String, @@ -896,20 +922,7 @@ private[hive] object HiveClientImpl { val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } - // after SPARK-19279, it is not allowed to create a hive table with an empty schema, - // so here we should not add a default col schema - if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) { - // This is a hack to preserve existing behavior. Before Spark 2.0, we do not - // set a default serde here (this was done in Hive), and so if the user provides - // an empty schema Hive would automatically populate the schema with a single - // field "col". However, after SPARK-14388, we set the default serde to - // LazySimpleSerde so this implicit behavior no longer happens. Therefore, - // we need to do it in Spark ourselves. - hiveTable.setFields( - Seq(new FieldSchema("col", "array<string>", "from deserializer")).asJava) - } else { - hiveTable.setFields(schema.asJava) - } + hiveTable.setFields(schema.asJava) hiveTable.setPartCols(partCols.asJava) userName.foreach(hiveTable.setOwner) hiveTable.setCreateTime((table.createTime / 1000).toInt) http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index d43534d..2e35fde 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -89,4 +89,22 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(restoredTable.schema == newSchema) } } + + test("SPARK-22306: alter table schema should not erase the bucketing metadata at hive side") { + val catalog = newBasicCatalog() + externalCatalog.client.runSqlHive( + """ + |CREATE TABLE db1.t(a string, b string) + |CLUSTERED BY (a, b) SORTED BY (a, b) INTO 10 BUCKETS + |STORED AS PARQUET + """.stripMargin) + + val newSchema = new StructType().add("a", "string").add("b", "string").add("c", "string") + catalog.alterTableDataSchema("db1", "t", newSchema) + + assert(catalog.getTable("db1", "t").schema == newSchema) + val bucketString = externalCatalog.client.runSqlHive("DESC FORMATTED db1.t") + .filter(_.contains("Num Buckets")).head + assert(bucketString.contains("10")) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index f5d41c9..a106047 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -741,7 +741,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val hiveTable = CatalogTable( identifier = TableIdentifier(tableName, Some("default")), tableType = CatalogTableType.MANAGED, - schema = new StructType, + schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, provider = Some("json"), storage = CatalogStorageFormat( locationUri = None, @@ -1266,7 +1266,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val hiveTable = CatalogTable( identifier = TableIdentifier("t", Some("default")), tableType = CatalogTableType.MANAGED, - schema = new StructType, + schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, provider = Some("json"), storage = CatalogStorageFormat.empty, properties = Map( http://git-wip-us.apache.org/repos/asf/spark/blob/2fd12af4/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala index 5c248b9..bc82887 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala @@ -117,7 +117,7 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with Before spark.sql(createTableStmt) val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName) catalog.createTable(oldTable, true) - catalog.alterTableSchema("default", tableName, updatedSchema) + catalog.alterTableDataSchema("default", tableName, updatedSchema) val updatedTable = catalog.getTable("default", tableName) assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org