This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9d9f292ac941 [SPARK-47873][SQL] Write collated strings to Hive metastore using the regular string type 9d9f292ac941 is described below commit 9d9f292ac9415269f604f14cb87dc8129b0bfb0c Author: Stefan Kandic <stefan.kan...@databricks.com> AuthorDate: Tue Apr 23 21:40:47 2024 +0800 [SPARK-47873][SQL] Write collated strings to Hive metastore using the regular string type ### What changes were proposed in this pull request? When writing table schema to hive stop writing collated strings as `string COLLATE name` but instead just write them as regular `string` type as hive doesn't support collations. Since we write the original schema as json to table properties in hive so we will able to read the collation back. Also when reading back the table from the catalog, aside from ignoring case and nullability we should now also ignore any differences in string types as well. ### Why are the changes needed? In order to not break hive compatibility with external engines using hive that would otherwise fail to parse this new type. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46083 from stefankandic/writeCollatedStringsHive. Authored-by: Stefan Kandic <stefan.kan...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/types/DataTypeUtils.scala | 27 +++++++- .../org/apache/spark/sql/util/SchemaUtils.scala | 17 +++++ .../spark/sql/hive/HiveExternalCatalog.scala | 74 ++++++++++++++-------- .../spark/sql/hive/HiveExternalCatalogSuite.scala | 43 ++++++++++++- 4 files changed, 134 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala index cf8e903f03a3..f8bb1077a080 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI, STRICT} -import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, Decimal, DecimalType, MapType, NullType, StructField, StructType, UserDefinedType} +import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, Decimal, DecimalType, MapType, NullType, StringType, StructField, StructType, UserDefinedType} import org.apache.spark.sql.types.DecimalType.{forType, fromDecimal} object DataTypeUtils { @@ -47,6 +47,31 @@ object DataTypeUtils { DataType.equalsIgnoreCaseAndNullability(from, to) } + /** + * Compares two types, ignoring nullability of ArrayType, MapType, StructType, ignoring case + * sensitivity of field names in StructType as well as differences in collation for String types. + */ + def equalsIgnoreCaseNullabilityAndCollation(from: DataType, to: DataType): Boolean = { + (from, to) match { + case (ArrayType(fromElement, _), ArrayType(toElement, _)) => + equalsIgnoreCaseNullabilityAndCollation(fromElement, toElement) + + case (MapType(fromKey, fromValue, _), MapType(toKey, toValue, _)) => + equalsIgnoreCaseNullabilityAndCollation(fromKey, toKey) && + equalsIgnoreCaseNullabilityAndCollation(fromValue, toValue) + + case (StructType(fromFields), StructType(toFields)) => + fromFields.length == toFields.length && + fromFields.zip(toFields).forall { case (l, r) => + l.name.equalsIgnoreCase(r.name) && + equalsIgnoreCaseNullabilityAndCollation(l.dataType, r.dataType) + } + + case (_: StringType, _: StringType) => true + case (fromDataType, toDataType) => fromDataType == toDataType + } + } + private val SparkGeneratedName = """col\d+""".r private def isSparkGeneratedName(name: String): Boolean = name match { case SparkGeneratedName(_*) => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 9c1e78190448..1e0bac331dc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -303,4 +303,21 @@ private[spark] object SchemaUtils { case _ => false } } + + /** + * Replaces any collated string type with non collated StringType + * recursively in the given data type. + */ + def replaceCollatedStringWithString(dt: DataType): DataType = dt match { + case ArrayType(et, nullable) => + ArrayType(replaceCollatedStringWithString(et), nullable) + case MapType(kt, vt, nullable) => + MapType(replaceCollatedStringWithString(kt), replaceCollatedStringWithString(vt), nullable) + case StructType(fields) => + StructType(fields.map { field => + field.copy(dataType = replaceCollatedStringWithString(field.dataType)) + }) + case _: StringType => StringType + case _ => dt + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 60f2d2f3e5fe..1808986ff2e6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.SchemaUtils /** * A persistent implementation of the system catalog using Hive. @@ -233,12 +234,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableDefinition.storage.locationUri } + val hiveCompatibleSchema = tryGetHiveCompatibleSchema(tableDefinition.schema) + if (DDLUtils.isDatasourceTable(tableDefinition)) { + // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type + // support, no column nullability, etc., we should do some extra works before saving table + // metadata into Hive metastore: + // 1. Put table metadata like table schema, partition columns, etc. in table properties. + // 2. Check if this table is hive compatible. + // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket + // spec to empty and save table metadata to Hive. + // 2.2 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 + val tableProperties = tableMetaToTableProps(tableDefinition) + + // put table provider and partition provider in table properties. + tableProperties.put(DATASOURCE_PROVIDER, tableDefinition.provider.get) + if (tableDefinition.tracksPartitionsInCatalog) { + tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG) + } + + // we have to set the table schema here so that the table schema JSON + // string in the table properties still uses the original schema + val hiveTable = tableDefinition.copy( + schema = hiveCompatibleSchema, + properties = tableDefinition.properties ++ tableProperties + ) + createDataSourceTable( - tableDefinition.withNewStorage(locationUri = tableLocation), + hiveTable.withNewStorage(locationUri = tableLocation), ignoreIfExists) } else { val tableWithDataSourceProps = tableDefinition.copy( + schema = hiveCompatibleSchema, // We can't leave `locationUri` empty and count on Hive metastore to set a default table // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default // table location for tables in default database, while we expect to use the location of @@ -268,23 +296,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val provider = table.provider.get val options = new SourceOptions(table.storage.properties) - // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type - // support, no column nullability, etc., we should do some extra works before saving table - // metadata into Hive metastore: - // 1. Put table metadata like table schema, partition columns, etc. in table properties. - // 2. Check if this table is hive compatible. - // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket - // spec to empty and save table metadata to Hive. - // 2.2 If it's hive compatible, set serde information in table metadata and try to save - // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 - val tableProperties = tableMetaToTableProps(table) - - // put table provider and partition provider in table properties. - tableProperties.put(DATASOURCE_PROVIDER, provider) - if (table.tracksPartitionsInCatalog) { - tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG) - } - // Ideally we should also put `locationUri` in table properties like provider, schema, etc. // However, in older version of Spark we already store table location in storage properties // with key "path". Here we keep this behaviour for backward compatibility. @@ -303,8 +314,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat locationUri = None, properties = storagePropsWithLocation), schema = StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema), - bucketSpec = None, - properties = table.properties ++ tableProperties) + bucketSpec = None) } // converts the table metadata to Hive compatible format, i.e. set the serde information. @@ -326,8 +336,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat outputFormat = serde.outputFormat, serde = serde.serde, properties = storagePropsWithLocation - ), - properties = table.properties ++ tableProperties) + ) + ) } val qualifiedTableName = table.identifier.quotedString @@ -669,6 +679,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val schemaProps = tableMetaToTableProps(oldTable, StructType(newDataSchema ++ oldTable.partitionSchema)).toMap + val hiveSchema = tryGetHiveCompatibleSchema(newDataSchema) + if (isDatasourceTable(oldTable)) { // For data source tables, first try to write it with the schema set; if that does not work, // try again with updated properties and the partition schema. This is a simplified version of @@ -676,7 +688,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // (for example, the schema does not match the data source schema, or does not match the // storage descriptor). try { - client.alterTableDataSchema(db, table, newDataSchema, schemaProps) + client.alterTableDataSchema(db, table, hiveSchema, schemaProps) } catch { case NonFatal(e) => val warningMessage = log"Could not alter schema of table " + @@ -686,10 +698,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps) } } else { - client.alterTableDataSchema(db, table, newDataSchema, schemaProps) + client.alterTableDataSchema(db, table, hiveSchema, schemaProps) } } + /** + * Tries to fix the schema so that all column data types are Hive-compatible + * ie. the types are converted to the types that Hive supports. + */ + private def tryGetHiveCompatibleSchema(schema: StructType): StructType = { + // Since collated strings do not exist in Hive as a type we need to replace them with + // the the regular string type. However, as we save the original schema in the table + // properties we will be able to restore the original schema when reading back the table. + SchemaUtils.replaceCollatedStringWithString(schema).asInstanceOf[StructType] + } + /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */ override def alterTableStats( db: String, @@ -792,7 +815,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) - if (DataTypeUtils.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) || + if (DataTypeUtils.equalsIgnoreCaseNullabilityAndCollation(reorderedSchema, table.schema) || options.respectSparkSchema) { hiveTable.copy( schema = reorderedSchema, @@ -1425,6 +1448,7 @@ object HiveExternalCatalog { case a: ArrayType => isHiveCompatibleDataType(a.elementType) case m: MapType => isHiveCompatibleDataType(m.keyType) && isHiveCompatibleDataType(m.valueType) + case st: StringType => st.isUTF8BinaryCollation case _ => true } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index e413e0ee73cb..2c42eaebd701 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -22,9 +22,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructField, StructType} /** * Test suite for the [[HiveExternalCatalog]]. @@ -200,4 +201,44 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(alteredTable.provider === Some("foo")) }) } + + test("write collated strings as regular strings in hive - but read them back as collated") { + val catalog = newBasicCatalog() + val tableName = "collation_tbl" + val columnName = "col1" + + val collationsSchema = StructType(Seq( + StructField(columnName, StringType("UNICODE")) + )) + val noCollationsSchema = StructType(Seq( + StructField(columnName, StringType) + )) + + val tableDDL = CatalogTable( + identifier = TableIdentifier(tableName, Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = storageFormat, + schema = collationsSchema, + provider = Some("hive")) + + catalog.createTable(tableDDL, ignoreIfExists = false) + + val rawTable = externalCatalog.getRawTable("db1", tableName) + assert(DataTypeUtils.sameType(rawTable.schema, noCollationsSchema)) + + val readBackTable = externalCatalog.getTable("db1", tableName) + assert(DataTypeUtils.sameType(readBackTable.schema, collationsSchema)) + + // perform alter table + val newSchema = StructType(Seq( + StructField("col1", StringType("UTF8_BINARY_LCASE")) + )) + catalog.alterTableDataSchema("db1", tableName, newSchema) + + val alteredRawTable = externalCatalog.getRawTable("db1", tableName) + assert(DataTypeUtils.sameType(alteredRawTable.schema, noCollationsSchema)) + + val alteredTable = externalCatalog.getTable("db1", tableName) + assert(DataTypeUtils.sameType(alteredTable.schema, newSchema)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org