This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new d1ea322 [SPARK-37452][SQL] Char and Varchar break backward compatibility between v3.1 and v2 d1ea322 is described below commit d1ea322b012eb7d097b8ccff6e30943f9db4be2b Author: Kent Yao <y...@apache.org> AuthorDate: Mon Nov 29 17:05:56 2021 +0800 [SPARK-37452][SQL] Char and Varchar break backward compatibility between v3.1 and v2 ### What changes were proposed in this pull request? We will store table schema in table properties for the read-side to restore. In Spark 3.1, we add char/varchar support natively. In some commands like `create table`, `alter table` with these types, the `char(x)` or `varchar(x)` will be stored directly to those properties. If a user uses Spark 2 to read such a table it will fail to parse the schema. FYI, https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala#L136 A table can be a newly created one by Spark 3.1 and later or an existing one modified by Spark 3.1 and on. ### Why are the changes needed? backward compatibility ### Does this PR introduce _any_ user-facing change? That's not necessarily user-facing as a bugfix and only related to internal table properties. ### How was this patch tested? manully Closes #34697 from yaooqinn/SPARK-37452. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> (cherry picked from commit 0c3c4e2fd06629b77b86eeb36490ecf07d5283fc) Signed-off-by: Kent Yao <y...@apache.org> --- .../spark/sql/hive/HiveExternalCatalog.scala | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 568e814..8551780 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient @@ -429,8 +429,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new mutable.HashMap[String, String] properties.put(CREATED_SPARK_VERSION, table.createVersion) + // This is for backward compatibility to Spark 2 to read tables with char/varchar created by + // Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to + // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have + // a type mapping for them in `DataType.nameToType`. + // See `restoreHiveSerdeTable` for example. + val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) CatalogTable.splitLargeTableProp( - DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)) + DATASOURCE_SCHEMA, + newSchema.json, + properties.put, + conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)) if (partitionColumns.nonEmpty) { properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) @@ -735,8 +744,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case None if table.tableType == VIEW => // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. - CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson => - table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]) + getSchemaFromTableProperties(table.properties).foreach { schemaFromTableProps => + table = table.copy(schema = schemaFromTableProps) } // No provider in table properties, which means this is a Hive serde table. @@ -786,9 +795,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its // schema from table properties. - val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) - if (schemaJson.isDefined) { - val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType] + val maybeSchemaFromTableProps = getSchemaFromTableProperties(table.properties) + if (maybeSchemaFromTableProps.isDefined) { + val schemaFromTableProps = maybeSchemaFromTableProps.get val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) @@ -814,6 +823,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + private def getSchemaFromTableProperties( + tableProperties: Map[String, String]): Option[StructType] = { + CatalogTable.readLargeTableProp(tableProperties, DATASOURCE_SCHEMA).map { schemaJson => + val parsed = DataType.fromJson(schemaJson).asInstanceOf[StructType] + CharVarcharUtils.getRawSchema(parsed) + } + } + private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = { // Internally we store the table location in storage properties with key "path" for data // source tables. Here we set the table location to `locationUri` field and filter out the @@ -828,8 +845,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) - val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) - .map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType()) + val schemaFromTableProps = + getSchemaFromTableProperties(table.properties).getOrElse(new StructType()) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org