This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2e4a8cb [SPARK-31147][SQL] Forbid CHAR type in non-Hive-Serde tables 2e4a8cb is described below commit 2e4a8cb1d53f6338789d554e04dd1a22d60d5405 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed Mar 25 09:25:55 2020 -0700 [SPARK-31147][SQL] Forbid CHAR type in non-Hive-Serde tables Spark introduced CHAR type for hive compatibility but it only works for hive tables. CHAR type is never documented and is treated as STRING type for non-Hive tables. However, this leads to confusing behaviors **Apache Spark 3.0.0-preview2** ``` spark-sql> CREATE TABLE t(a CHAR(3)); spark-sql> INSERT INTO TABLE t SELECT 'a '; spark-sql> SELECT a, length(a) FROM t; a 2 ``` **Apache Spark 2.4.5** ``` spark-sql> CREATE TABLE t(a CHAR(3)); spark-sql> INSERT INTO TABLE t SELECT 'a '; spark-sql> SELECT a, length(a) FROM t; a 3 ``` According to the SQL standard, `CHAR(3)` should guarantee all the values are of length 3. Since `CHAR(3)` is treated as STRING so Spark doesn't guarantee it. This PR forbids CHAR type in non-Hive tables as it's not supported correctly. avoid confusing/wrong behavior yes, now users can't create/alter non-Hive tables with CHAR type. new tests Closes #27902 from cloud-fan/char. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- docs/sql-migration-guide.md | 18 +++--- .../sql/catalyst/analysis/ResolveCatalogs.scala | 4 ++ .../spark/sql/catalyst/parser/AstBuilder.scala | 4 ++ .../spark/sql/catalyst/parser/ParseDriver.scala | 5 ++ .../sql/catalyst/parser/ParserInterface.scala | 6 ++ .../sql/connector/catalog/CatalogV2Util.scala | 19 +++++- .../apache/spark/sql/types/HiveStringType.scala | 7 +++ .../catalyst/analysis/ResolveSessionCatalog.scala | 14 +++++ .../spark/sql/SparkSessionExtensionSuite.scala | 3 + .../execution/command/PlanResolutionSuite.scala | 67 ++++++++++++++++++---- .../spark/sql/hive/HiveMetastoreCatalogSuite.scala | 2 +- 11 files changed, 129 insertions(+), 20 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index b4e51c2..d2773d8 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -108,6 +108,8 @@ license: | - Since Spark 3.0, `SHOW CREATE TABLE` will always return Spark DDL, even when the given table is a Hive serde table. For generating Hive DDL, please use `SHOW CREATE TABLE AS SERDE` command instead. + - Since Spark 3.0, column of CHAR type is not allowed in non-Hive-Serde tables, and CREATE/ALTER TABLE commands will fail if CHAR type is detected. Please use STRING type instead. In Spark version 2.4 and earlier, CHAR type is treated as STRING type and the length parameter is simply ignored. + ### UDFs and Built-in Functions - Since Spark 3.0, the `date_add` and `date_sub` functions only accept int, smallint, tinyint as the 2nd argument, fractional and non-literal string are not valid anymore, e.g. `date_add(cast('1964-05-23' as date), 12.34)` will cause `AnalysisException`. Note that, string literals are still allowed, but Spark will throw Analysis Exception if the string content is not a valid integer. In Spark version 2.4 and earlier, if the 2nd argument is fractional or string value, it will be coerced [...] @@ -964,29 +966,29 @@ Spark SQL supports the vast majority of Hive features, such as: * Unions * Sub-queries * Sub-queries in the FROM Clause - + ```SELECT col FROM (SELECT a + b AS col FROM t1) t2``` * Sub-queries in WHERE Clause * Correlated or non-correlated IN and NOT IN statement in WHERE Clause - + ``` SELECT col FROM t1 WHERE col IN (SELECT a FROM t2 WHERE t1.a = t2.a) SELECT col FROM t1 WHERE col IN (SELECT a FROM t2) ``` * Correlated or non-correlated EXISTS and NOT EXISTS statement in WHERE Clause - + ``` SELECT col FROM t1 WHERE EXISTS (SELECT t2.a FROM t2 WHERE t1.a = t2.a AND t2.a > 10) SELECT col FROM t1 WHERE EXISTS (SELECT t2.a FROM t2 WHERE t2.a > 10) ``` * Non-correlated IN and NOT IN statement in JOIN Condition - + ```SELECT t1.col FROM t1 JOIN t2 ON t1.a = t2.a AND t1.a IN (SELECT a FROM t3)``` - + * Non-correlated EXISTS and NOT EXISTS statement in JOIN Condition - - ```SELECT t1.col FROM t1 JOIN t2 ON t1.a = t2.a AND EXISTS (SELECT * FROM t3 WHERE t3.a > 10)``` - + + ```SELECT t1.col FROM t1 JOIN t2 ON t1.a = t2.a AND EXISTS (SELECT * FROM t3 WHERE t3.a > 10)``` + * Sampling * Explain * Partitioned tables including dynamic partition insertion diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 88a3c0a..895dfbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -34,6 +34,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case AlterTableAddColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => + cols.foreach(c => failCharType(c.dataType)) val changes = cols.map { col => TableChange.addColumn( col.name.toArray, @@ -46,6 +47,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case a @ AlterTableAlterColumnStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => + a.dataType.foreach(failCharType) val colName = a.column.toArray val typeChange = a.dataType.map { newDataType => TableChange.updateColumnType(colName, newDataType) @@ -121,6 +123,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => + assertNoCharTypeInSchema(c.tableSchema) CreateV2Table( catalog.asTableCatalog, tbl.asIdentifier, @@ -147,6 +150,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => + assertNoCharTypeInSchema(c.tableSchema) ReplaceTable( catalog.asTableCatalog, tbl.asIdentifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b94fd1d..09d316b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -104,6 +104,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging withOrigin(ctx)(StructType(visitColTypeList(ctx.colTypeList))) } + def parseRawDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) { + typedVisit[DataType](ctx.dataType()) + } + /* ******************************************************************************************** * Plan parsing * ******************************************************************************************** */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index c96f2da..590193b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -40,6 +40,11 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log astBuilder.visitSingleDataType(parser.singleDataType()) } + /** Similar to `parseDataType`, but without CHAR/VARCHAR replacement. */ + override def parseRawDataType(sqlText: String): DataType = parse(sqlText) { parser => + astBuilder.parseRawDataType(parser.singleDataType()) + } + /** Creates Expression for a given SQL string. */ override def parseExpression(sqlText: String): Expression = parse(sqlText) { parser => astBuilder.visitSingleExpression(parser.singleExpression()) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala index 77e357a..d724933 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala @@ -70,4 +70,10 @@ trait ParserInterface { */ @throws[ParseException]("Text cannot be parsed to a DataType") def parseDataType(sqlText: String): DataType + + /** + * Parse a string to a raw [[DataType]] without CHAR/VARCHAR replacement. + */ + @throws[ParseException]("Text cannot be parsed to a DataType") + def parseRawDataType(sqlText: String): DataType } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index ff63201..e1f3293 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,11 +22,13 @@ import java.util.Collections import scala.collection.JavaConverters._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.AlterTable import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -329,4 +331,19 @@ private[sql] object CatalogV2Util { .getOrElse(catalogManager.v2SessionCatalog) .asTableCatalog } + + def failCharType(dt: DataType): Unit = { + if (HiveStringType.containsCharType(dt)) { + throw new AnalysisException( + "Cannot use CHAR type in non-Hive-Serde tables, please use STRING type instead.") + } + } + + def assertNoCharTypeInSchema(schema: StructType): Unit = { + schema.foreach { f => + if (f.metadata.contains(HIVE_TYPE_STRING)) { + failCharType(CatalystSqlParser.parseRawDataType(f.metadata.getString(HIVE_TYPE_STRING))) + } + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala index 4eb3226..a29f49a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala @@ -53,6 +53,13 @@ object HiveStringType { case _: HiveStringType => StringType case _ => dt } + + def containsCharType(dt: DataType): Boolean = dt match { + case ArrayType(et, _) => containsCharType(et) + case MapType(kt, vt, _) => containsCharType(kt) || containsCharType(vt) + case StructType(fields) => fields.exists(f => containsCharType(f.dataType)) + case _ => dt.isInstanceOf[CharType] + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 5330612..7b88fc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -50,6 +50,9 @@ class ResolveSessionCatalog( nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => loadTable(catalog, tbl.asIdentifier).collect { case v1Table: V1Table => + if (!DDLUtils.isHiveTable(v1Table.v1Table)) { + cols.foreach(c => failCharType(c.dataType)) + } cols.foreach { c => assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand") if (!c.nullable) { @@ -59,6 +62,7 @@ class ResolveSessionCatalog( } AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField)) }.getOrElse { + cols.foreach(c => failCharType(c.dataType)) val changes = cols.map { col => TableChange.addColumn( col.name.toArray, @@ -74,6 +78,10 @@ class ResolveSessionCatalog( nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => loadTable(catalog, tbl.asIdentifier).collect { case v1Table: V1Table => + if (!DDLUtils.isHiveTable(v1Table.v1Table)) { + a.dataType.foreach(failCharType) + } + if (a.column.length > 1) { throw new AnalysisException( "ALTER COLUMN with qualified column is only supported with v2 tables.") @@ -111,6 +119,7 @@ class ResolveSessionCatalog( builder.build()) AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName, newColumn) }.getOrElse { + a.dataType.foreach(failCharType) val colName = a.column.toArray val typeChange = a.dataType.map { newDataType => TableChange.updateColumnType(colName, newDataType) @@ -237,12 +246,16 @@ class ResolveSessionCatalog( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => val provider = c.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { + if (!DDLUtils.isHiveTable(Some(provider))) { + assertNoCharTypeInSchema(c.tableSchema) + } val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema, c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location, c.comment, c.ifNotExists) val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, None) } else { + assertNoCharTypeInSchema(c.tableSchema) CreateV2Table( catalog.asTableCatalog, tbl.asIdentifier, @@ -286,6 +299,7 @@ class ResolveSessionCatalog( if (!isV2Provider(provider)) { throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.") } else { + assertNoCharTypeInSchema(c.tableSchema) ReplaceTable( catalog.asTableCatalog, tbl.asIdentifier, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 99ea950..2d48f8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -327,6 +327,9 @@ case class MyParser(spark: SparkSession, delegate: ParserInterface) extends Pars override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText) + + override def parseRawDataType(sqlText: String): DataType = + delegate.parseRawDataType(sqlText) } object MyExtensions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 9df3c33..2e88ea9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -37,11 +37,13 @@ import org.apache.spark.sql.connector.catalog.TableChange.{UpdateColumnComment, import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.SimpleScanSource import org.apache.spark.sql.types.{CharType, DoubleType, HIVE_TYPE_STRING, IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType} class PlanResolutionSuite extends AnalysisTest { import CatalystSqlParser._ + private val v1Format = classOf[SimpleScanSource].getName private val v2Format = classOf[FakeV2Provider].getName private val table: Table = { @@ -61,6 +63,15 @@ class PlanResolutionSuite extends AnalysisTest { val t = mock(classOf[CatalogTable]) when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string")) when(t.tableType).thenReturn(CatalogTableType.MANAGED) + when(t.provider).thenReturn(Some(v1Format)) + V1Table(t) + } + + private val v1HiveTable: V1Table = { + val t = mock(classOf[CatalogTable]) + when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string")) + when(t.tableType).thenReturn(CatalogTableType.MANAGED) + when(t.provider).thenReturn(Some("hive")) V1Table(t) } @@ -83,6 +94,7 @@ class PlanResolutionSuite extends AnalysisTest { invocation.getArgument[Identifier](0).name match { case "v1Table" => v1Table case "v1Table1" => v1Table + case "v1HiveTable" => v1HiveTable case "v2Table" => table case "v2Table1" => table case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability @@ -1046,15 +1058,6 @@ class PlanResolutionSuite extends AnalysisTest { } assert(e2.getMessage.contains( "ALTER COLUMN with qualified column is only supported with v2 tables")) - - val sql5 = s"ALTER TABLE $tblName ALTER COLUMN i TYPE char(1)" - val builder = new MetadataBuilder - builder.putString(HIVE_TYPE_STRING, CharType(1).catalogString) - val newColumnWithCleanedType = StructField("i", StringType, true, builder.build()) - val expected5 = AlterTableChangeColumnCommand( - tableIdent, "i", newColumnWithCleanedType) - val parsed5 = parseAndResolve(sql5) - comparePlans(parsed5, expected5) } else { parsed1 match { case AlterTable(_, _, _: DataSourceV2Relation, changes) => @@ -1071,6 +1074,15 @@ class PlanResolutionSuite extends AnalysisTest { } } } + + val sql = s"ALTER TABLE v1HiveTable ALTER COLUMN i TYPE char(1)" + val builder = new MetadataBuilder + builder.putString(HIVE_TYPE_STRING, CharType(1).catalogString) + val newColumnWithCleanedType = StructField("i", StringType, true, builder.build()) + val expected = AlterTableChangeColumnCommand( + TableIdentifier("v1HiveTable", Some("default")), "i", newColumnWithCleanedType) + val parsed = parseAndResolve(sql) + comparePlans(parsed, expected) } test("alter table: alter column action is not specified") { @@ -1506,6 +1518,42 @@ class PlanResolutionSuite extends AnalysisTest { case l => fail("Expected unresolved MergeIntoTable, but got:\n" + l.treeString) } } + + test("SPARK-31147: forbid CHAR type in non-Hive tables") { + def checkFailure(t: String, provider: String): Unit = { + val types = Seq( + "CHAR(2)", + "ARRAY<CHAR(2)>", + "MAP<INT, CHAR(2)>", + "MAP<CHAR(2), INT>", + "STRUCT<s: CHAR(2)>") + types.foreach { tpe => + intercept[AnalysisException] { + parseAndResolve(s"CREATE TABLE $t(col $tpe) USING $provider") + } + intercept[AnalysisException] { + parseAndResolve(s"REPLACE TABLE $t(col $tpe) USING $provider") + } + intercept[AnalysisException] { + parseAndResolve(s"CREATE OR REPLACE TABLE $t(col $tpe) USING $provider") + } + intercept[AnalysisException] { + parseAndResolve(s"ALTER TABLE $t ADD COLUMN col $tpe") + } + intercept[AnalysisException] { + parseAndResolve(s"ALTER TABLE $t ADD COLUMN col $tpe") + } + intercept[AnalysisException] { + parseAndResolve(s"ALTER TABLE $t ALTER COLUMN col TYPE $tpe") + } + } + } + + checkFailure("v1Table", v1Format) + checkFailure("v2Table", v2Format) + checkFailure("testcat.tab", "foo") + } + // TODO: add tests for more commands. } @@ -1515,4 +1563,3 @@ object AsDataSourceV2Relation { case _ => None } } - diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 9cd56f1..95e99c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -97,7 +97,7 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton with SQLTestUtils { |c22 map<int,char(10)>, |c23 struct<a:int,b:int>, |c24 struct<c:varchar(10),d:int> - |) + |) USING hive """.stripMargin) val schema = hiveClient.getTable("default", "t").schema --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org