This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d9f292ac941 [SPARK-47873][SQL] Write collated strings to Hive 
metastore using the regular string type
9d9f292ac941 is described below

commit 9d9f292ac9415269f604f14cb87dc8129b0bfb0c
Author: Stefan Kandic <stefan.kan...@databricks.com>
AuthorDate: Tue Apr 23 21:40:47 2024 +0800

    [SPARK-47873][SQL] Write collated strings to Hive metastore using the 
regular string type
    
    ### What changes were proposed in this pull request?
    
    When writing table schema to hive stop writing collated strings as `string 
COLLATE name` but instead just write them as regular `string` type as hive 
doesn't support collations.
    
    Since we write the original schema as json to table properties in hive so 
we will able to read the collation back.
    
    Also when reading back the table from the catalog, aside from ignoring case 
and nullability we should now also ignore any differences in string types as 
well.
    
    ### Why are the changes needed?
    
    In order to not break hive compatibility with external engines using hive 
that would otherwise fail to parse this new type.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46083 from stefankandic/writeCollatedStringsHive.
    
    Authored-by: Stefan Kandic <stefan.kan...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/types/DataTypeUtils.scala   | 27 +++++++-
 .../org/apache/spark/sql/util/SchemaUtils.scala    | 17 +++++
 .../spark/sql/hive/HiveExternalCatalog.scala       | 74 ++++++++++++++--------
 .../spark/sql/hive/HiveExternalCatalogSuite.scala  | 43 ++++++++++++-
 4 files changed, 134 insertions(+), 27 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
index cf8e903f03a3..f8bb1077a080 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI, 
STRICT}
-import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, Decimal, 
DecimalType, MapType, NullType, StructField, StructType, UserDefinedType}
+import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, Decimal, 
DecimalType, MapType, NullType, StringType, StructField, StructType, 
UserDefinedType}
 import org.apache.spark.sql.types.DecimalType.{forType, fromDecimal}
 
 object DataTypeUtils {
@@ -47,6 +47,31 @@ object DataTypeUtils {
     DataType.equalsIgnoreCaseAndNullability(from, to)
   }
 
+  /**
+   * Compares two types, ignoring nullability of ArrayType, MapType, 
StructType, ignoring case
+   * sensitivity of field names in StructType as well as differences in 
collation for String types.
+   */
+  def equalsIgnoreCaseNullabilityAndCollation(from: DataType, to: DataType): 
Boolean = {
+    (from, to) match {
+      case (ArrayType(fromElement, _), ArrayType(toElement, _)) =>
+        equalsIgnoreCaseNullabilityAndCollation(fromElement, toElement)
+
+      case (MapType(fromKey, fromValue, _), MapType(toKey, toValue, _)) =>
+        equalsIgnoreCaseNullabilityAndCollation(fromKey, toKey) &&
+          equalsIgnoreCaseNullabilityAndCollation(fromValue, toValue)
+
+      case (StructType(fromFields), StructType(toFields)) =>
+        fromFields.length == toFields.length &&
+          fromFields.zip(toFields).forall { case (l, r) =>
+            l.name.equalsIgnoreCase(r.name) &&
+              equalsIgnoreCaseNullabilityAndCollation(l.dataType, r.dataType)
+          }
+
+      case (_: StringType, _: StringType) => true
+      case (fromDataType, toDataType) => fromDataType == toDataType
+    }
+  }
+
   private val SparkGeneratedName = """col\d+""".r
   private def isSparkGeneratedName(name: String): Boolean = name match {
     case SparkGeneratedName(_*) => true
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
index 9c1e78190448..1e0bac331dc7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
@@ -303,4 +303,21 @@ private[spark] object SchemaUtils {
       case _ => false
     }
   }
+
+  /**
+   * Replaces any collated string type with non collated StringType
+   * recursively in the given data type.
+   */
+  def replaceCollatedStringWithString(dt: DataType): DataType = dt match {
+    case ArrayType(et, nullable) =>
+      ArrayType(replaceCollatedStringWithString(et), nullable)
+    case MapType(kt, vt, nullable) =>
+      MapType(replaceCollatedStringWithString(kt), 
replaceCollatedStringWithString(vt), nullable)
+    case StructType(fields) =>
+      StructType(fields.map { field =>
+        field.copy(dataType = replaceCollatedStringWithString(field.dataType))
+      })
+    case _: StringType => StringType
+    case _ => dt
+  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 60f2d2f3e5fe..1808986ff2e6 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -49,6 +49,7 @@ import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * A persistent implementation of the system catalog using Hive.
@@ -233,12 +234,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       tableDefinition.storage.locationUri
     }
 
+    val hiveCompatibleSchema = 
tryGetHiveCompatibleSchema(tableDefinition.schema)
+
     if (DDLUtils.isDatasourceTable(tableDefinition)) {
+      // To work around some hive metastore issues, e.g. not case-preserving, 
bad decimal type
+      // support, no column nullability, etc., we should do some extra works 
before saving table
+      // metadata into Hive metastore:
+      //  1. Put table metadata like table schema, partition columns, etc. in 
table properties.
+      //  2. Check if this table is hive compatible.
+      //    2.1  If it's not hive compatible, set location URI, schema, 
partition columns and bucket
+      //         spec to empty and save table metadata to Hive.
+      //    2.2  If it's hive compatible, set serde information in table 
metadata and try to save
+      //         it to Hive. If it fails, treat it as not hive compatible and 
go back to 2.1
+      val tableProperties = tableMetaToTableProps(tableDefinition)
+
+      // put table provider and partition provider in table properties.
+      tableProperties.put(DATASOURCE_PROVIDER, tableDefinition.provider.get)
+      if (tableDefinition.tracksPartitionsInCatalog) {
+        tableProperties.put(TABLE_PARTITION_PROVIDER, 
TABLE_PARTITION_PROVIDER_CATALOG)
+      }
+
+      // we have to set the table schema here so that the table schema JSON
+      // string in the table properties still uses the original schema
+      val hiveTable = tableDefinition.copy(
+        schema = hiveCompatibleSchema,
+        properties = tableDefinition.properties ++ tableProperties
+      )
+
       createDataSourceTable(
-        tableDefinition.withNewStorage(locationUri = tableLocation),
+        hiveTable.withNewStorage(locationUri = tableLocation),
         ignoreIfExists)
     } else {
       val tableWithDataSourceProps = tableDefinition.copy(
+        schema = hiveCompatibleSchema,
         // We can't leave `locationUri` empty and count on Hive metastore to 
set a default table
         // location, because Hive metastore uses hive.metastore.warehouse.dir 
to generate default
         // table location for tables in default database, while we expect to 
use the location of
@@ -268,23 +296,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val provider = table.provider.get
     val options = new SourceOptions(table.storage.properties)
 
-    // To work around some hive metastore issues, e.g. not case-preserving, 
bad decimal type
-    // support, no column nullability, etc., we should do some extra works 
before saving table
-    // metadata into Hive metastore:
-    //  1. Put table metadata like table schema, partition columns, etc. in 
table properties.
-    //  2. Check if this table is hive compatible.
-    //    2.1  If it's not hive compatible, set location URI, schema, 
partition columns and bucket
-    //         spec to empty and save table metadata to Hive.
-    //    2.2  If it's hive compatible, set serde information in table 
metadata and try to save
-    //         it to Hive. If it fails, treat it as not hive compatible and go 
back to 2.1
-    val tableProperties = tableMetaToTableProps(table)
-
-    // put table provider and partition provider in table properties.
-    tableProperties.put(DATASOURCE_PROVIDER, provider)
-    if (table.tracksPartitionsInCatalog) {
-      tableProperties.put(TABLE_PARTITION_PROVIDER, 
TABLE_PARTITION_PROVIDER_CATALOG)
-    }
-
     // Ideally we should also put `locationUri` in table properties like 
provider, schema, etc.
     // However, in older version of Spark we already store table location in 
storage properties
     // with key "path". Here we keep this behaviour for backward compatibility.
@@ -303,8 +314,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
           locationUri = None,
           properties = storagePropsWithLocation),
         schema = StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema),
-        bucketSpec = None,
-        properties = table.properties ++ tableProperties)
+        bucketSpec = None)
     }
 
     // converts the table metadata to Hive compatible format, i.e. set the 
serde information.
@@ -326,8 +336,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
           outputFormat = serde.outputFormat,
           serde = serde.serde,
           properties = storagePropsWithLocation
-        ),
-        properties = table.properties ++ tableProperties)
+        )
+      )
     }
 
     val qualifiedTableName = table.identifier.quotedString
@@ -669,6 +679,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val schemaProps =
       tableMetaToTableProps(oldTable, StructType(newDataSchema ++ 
oldTable.partitionSchema)).toMap
 
+    val hiveSchema = tryGetHiveCompatibleSchema(newDataSchema)
+
     if (isDatasourceTable(oldTable)) {
       // For data source tables, first try to write it with the schema set; if 
that does not work,
       // try again with updated properties and the partition schema. This is a 
simplified version of
@@ -676,7 +688,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       // (for example, the schema does not match the data source schema, or 
does not match the
       // storage descriptor).
       try {
-        client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
+        client.alterTableDataSchema(db, table, hiveSchema, schemaProps)
       } catch {
         case NonFatal(e) =>
           val warningMessage = log"Could not alter schema of table " +
@@ -686,10 +698,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
           client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, 
schemaProps)
       }
     } else {
-      client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
+      client.alterTableDataSchema(db, table, hiveSchema, schemaProps)
     }
   }
 
+  /**
+   * Tries to fix the schema so that all column data types are Hive-compatible
+   * ie. the types are converted to the types that Hive supports.
+   */
+  private def tryGetHiveCompatibleSchema(schema: StructType): StructType = {
+    // Since collated strings do not exist in Hive as a type we need to 
replace them with
+    // the the regular string type. However, as we save the original schema in 
the table
+    // properties we will be able to restore the original schema when reading 
back the table.
+    
SchemaUtils.replaceCollatedStringWithString(schema).asInstanceOf[StructType]
+  }
+
   /** Alter the statistics of a table. If `stats` is None, then remove all 
existing statistics. */
   override def alterTableStats(
       db: String,
@@ -792,7 +815,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       val partColumnNames = getPartitionColumnsFromTableProperties(table)
       val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
 
-      if (DataTypeUtils.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema) ||
+      if 
(DataTypeUtils.equalsIgnoreCaseNullabilityAndCollation(reorderedSchema, 
table.schema) ||
           options.respectSparkSchema) {
         hiveTable.copy(
           schema = reorderedSchema,
@@ -1425,6 +1448,7 @@ object HiveExternalCatalog {
     case a: ArrayType => isHiveCompatibleDataType(a.elementType)
     case m: MapType =>
       isHiveCompatibleDataType(m.keyType) && 
isHiveCompatibleDataType(m.valueType)
+    case st: StringType => st.isUTF8BinaryCollation
     case _ => true
   }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index e413e0ee73cb..2c42eaebd701 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -22,9 +22,10 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
 /**
  * Test suite for the [[HiveExternalCatalog]].
@@ -200,4 +201,44 @@ class HiveExternalCatalogSuite extends 
ExternalCatalogSuite {
       assert(alteredTable.provider === Some("foo"))
     })
   }
+
+  test("write collated strings as regular strings in hive - but read them back 
as collated") {
+    val catalog = newBasicCatalog()
+    val tableName = "collation_tbl"
+    val columnName = "col1"
+
+    val collationsSchema = StructType(Seq(
+      StructField(columnName, StringType("UNICODE"))
+    ))
+    val noCollationsSchema = StructType(Seq(
+      StructField(columnName, StringType)
+    ))
+
+    val tableDDL = CatalogTable(
+      identifier = TableIdentifier(tableName, Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = storageFormat,
+      schema = collationsSchema,
+      provider = Some("hive"))
+
+    catalog.createTable(tableDDL, ignoreIfExists = false)
+
+    val rawTable = externalCatalog.getRawTable("db1", tableName)
+    assert(DataTypeUtils.sameType(rawTable.schema, noCollationsSchema))
+
+    val readBackTable = externalCatalog.getTable("db1", tableName)
+    assert(DataTypeUtils.sameType(readBackTable.schema, collationsSchema))
+
+    // perform alter table
+    val newSchema = StructType(Seq(
+      StructField("col1", StringType("UTF8_BINARY_LCASE"))
+    ))
+    catalog.alterTableDataSchema("db1", tableName, newSchema)
+
+    val alteredRawTable = externalCatalog.getRawTable("db1", tableName)
+    assert(DataTypeUtils.sameType(alteredRawTable.schema, noCollationsSchema))
+
+    val alteredTable = externalCatalog.getTable("db1", tableName)
+    assert(DataTypeUtils.sameType(alteredTable.schema, newSchema))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to