This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b5297c4 [SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype b5297c4 is described below commit b5297c43b0bd5a62a20fb047fdee24ebd63f939d Author: LantaoJin <jinlan...@gmail.com> AuthorDate: Tue Jul 7 18:58:01 2020 -0700 [SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype ### What changes were proposed in this pull request? This is the new PR which to address the close one #17953 1. support "void" primitive data type in the `AstBuilder`, point it to `NullType` 2. forbid creating tables with VOID/NULL column type ### Why are the changes needed? 1. Spark is incompatible with hive void type. When Hive table schema contains void type, DESC table will throw an exception in Spark. >hive> create table bad as select 1 x, null z from dual; >hive> describe bad; OK x int z void In Spark2.0.x, the behaviour to read this view is normal: >spark-sql> describe bad; x int NULL z void NULL Time taken: 4.431 seconds, Fetched 2 row(s) But in lastest Spark version, it failed with SparkException: Cannot recognize hive type string: void >spark-sql> describe bad; 17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad] org.apache.spark.SparkException: Cannot recognize hive type string: void Caused by: org.apache.spark.sql.catalyst.parser.ParseException: DataType void() is not supported.(line 1, pos 0) == SQL == void ^^^ ... 61 more org.apache.spark.SparkException: Cannot recognize hive type string: void 2. Hive CTAS statements throws error when select clause has NULL/VOID type column since HIVE-11217 In Spark, creating table with a VOID/NULL column should throw readable exception message, include - create data source table (using parquet, json, ...) - create hive table (with or without stored as) - CTAS ### Does this PR introduce any user-facing change? No ### How was this patch tested? Add unit tests Closes #28833 from LantaoJin/SPARK-20680_COPY. Authored-by: LantaoJin <jinlan...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- python/pyspark/sql/types.py | 3 + .../sql/catalyst/analysis/ResolveCatalogs.scala | 11 ++ .../spark/sql/catalyst/parser/AstBuilder.scala | 1 + .../sql/connector/catalog/CatalogV2Util.scala | 21 +++- .../org/apache/spark/sql/types/NullType.scala | 4 + .../sql/catalyst/parser/DataTypeParserSuite.scala | 1 + .../catalyst/analysis/ResolveSessionCatalog.scala | 11 ++ .../spark/sql/execution/datasources/rules.scala | 3 + .../sql-functions/sql-expression-schema.md | 2 +- .../sql-tests/results/ansi/literals.sql.out | 2 +- .../sql-tests/results/inline-table.sql.out | 2 +- .../resources/sql-tests/results/literals.sql.out | 2 +- .../sql-tests/results/misc-functions.sql.out | 2 +- .../sql-tests/results/postgreSQL/select.sql.out | 4 +- .../results/sql-compatibility-functions.sql.out | 6 +- .../sql-tests/results/udf/udf-inline-table.sql.out | 2 +- .../spark/sql/FileBasedDataSourceSuite.scala | 2 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 3 + .../spark/sql/hive/execution/HiveDDLSuite.scala | 121 +++++++++++++++++++++ .../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 2 +- 20 files changed, 191 insertions(+), 14 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 320a68d..ddd13ca3 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -116,6 +116,9 @@ class NullType(DataType): __metaclass__ = DataTypeSingleton + def simpleString(self): + return 'unknown' + class AtomicType(DataType): """An internal type used to represent everything that is not diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 2a0a944..a406040 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -34,6 +34,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case AlterTableAddColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => + cols.foreach(c => failNullType(c.dataType)) cols.foreach(c => failCharType(c.dataType)) val changes = cols.map { col => TableChange.addColumn( @@ -47,6 +48,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case AlterTableReplaceColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => + cols.foreach(c => failNullType(c.dataType)) cols.foreach(c => failCharType(c.dataType)) val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match { case Some(table) => @@ -69,6 +71,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case a @ AlterTableAlterColumnStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => + a.dataType.foreach(failNullType) a.dataType.foreach(failCharType) val colName = a.column.toArray val typeChange = a.dataType.map { newDataType => @@ -145,6 +148,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => + assertNoNullTypeInSchema(c.tableSchema) assertNoCharTypeInSchema(c.tableSchema) CreateV2Table( catalog.asTableCatalog, @@ -157,6 +161,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateTableAsSelectStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => + if (c.asSelect.resolved) { + assertNoNullTypeInSchema(c.asSelect.schema) + } CreateTableAsSelect( catalog.asTableCatalog, tbl.asIdentifier, @@ -172,6 +179,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => + assertNoNullTypeInSchema(c.tableSchema) assertNoCharTypeInSchema(c.tableSchema) ReplaceTable( catalog.asTableCatalog, @@ -184,6 +192,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ ReplaceTableAsSelectStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => + if (c.asSelect.resolved) { + assertNoNullTypeInSchema(c.asSelect.schema) + } ReplaceTableAsSelect( catalog.asTableCatalog, tbl.asIdentifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d08bcb1..6b41a8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2203,6 +2203,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging DecimalType(precision.getText.toInt, 0) case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) => DecimalType(precision.getText.toInt, scale.getText.toInt) + case ("void", Nil) => NullType case ("interval", Nil) => CalendarIntervalType case (dt, params) => val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e1f3293..d130a13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.AlterTable import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, NullType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -346,4 +346,23 @@ private[sql] object CatalogV2Util { } } } + + def failNullType(dt: DataType): Unit = { + def containsNullType(dt: DataType): Boolean = dt match { + case ArrayType(et, _) => containsNullType(et) + case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt) + case StructType(fields) => fields.exists(f => containsNullType(f.dataType)) + case _ => dt.isInstanceOf[NullType] + } + if (containsNullType(dt)) { + throw new AnalysisException( + "Cannot create tables with unknown type.") + } + } + + def assertNoNullTypeInSchema(schema: StructType): Unit = { + schema.foreach { f => + failNullType(f.dataType) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala index 14097a5..6c9a1d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala @@ -32,6 +32,10 @@ class NullType private() extends DataType { override def defaultSize: Int = 1 private[spark] override def asNullable: NullType = this + + // "null" is mainly used to represent a literal in Spark, + // it's better to avoid using it for data types. + override def simpleString: String = "unknown" } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala index d519fdf..655b1d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala @@ -61,6 +61,7 @@ class DataTypeParserSuite extends SparkFunSuite { checkDataType("varchAr(20)", StringType) checkDataType("cHaR(27)", StringType) checkDataType("BINARY", BinaryType) + checkDataType("void", NullType) checkDataType("interval", CalendarIntervalType) checkDataType("array<doublE>", ArrayType(DoubleType, true)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index bf90875..bc3f38a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -48,6 +48,7 @@ class ResolveSessionCatalog( override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case AlterTableAddColumnsStatement( nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => + cols.foreach(c => failNullType(c.dataType)) loadTable(catalog, tbl.asIdentifier).collect { case v1Table: V1Table => if (!DDLUtils.isHiveTable(v1Table.v1Table)) { @@ -76,6 +77,7 @@ class ResolveSessionCatalog( case AlterTableReplaceColumnsStatement( nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => + cols.foreach(c => failNullType(c.dataType)) val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match { case Some(_: V1Table) => throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.") @@ -100,6 +102,7 @@ class ResolveSessionCatalog( case a @ AlterTableAlterColumnStatement( nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => + a.dataType.foreach(failNullType) loadTable(catalog, tbl.asIdentifier).collect { case v1Table: V1Table => if (!DDLUtils.isHiveTable(v1Table.v1Table)) { @@ -268,6 +271,7 @@ class ResolveSessionCatalog( // session catalog and the table provider is not v2. case c @ CreateTableStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => + assertNoNullTypeInSchema(c.tableSchema) val provider = c.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { if (!DDLUtils.isHiveTable(Some(provider))) { @@ -292,6 +296,9 @@ class ResolveSessionCatalog( case c @ CreateTableAsSelectStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => + if (c.asSelect.resolved) { + assertNoNullTypeInSchema(c.asSelect.schema) + } val provider = c.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType, @@ -319,6 +326,7 @@ class ResolveSessionCatalog( // session catalog and the table provider is not v2. case c @ ReplaceTableStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) => + assertNoNullTypeInSchema(c.tableSchema) val provider = c.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.") @@ -336,6 +344,9 @@ class ResolveSessionCatalog( case c @ ReplaceTableAsSelectStatement( SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => + if (c.asSelect.resolved) { + assertNoNullTypeInSchema(c.asSelect.schema) + } val provider = c.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 95343e2..60cacda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 @@ -292,6 +293,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi "in the table definition of " + table.identifier, sparkSession.sessionState.conf.caseSensitiveAnalysis) + assertNoNullTypeInSchema(schema) + val normalizedPartCols = normalizePartitionColumns(schema, table) val normalizedBucketSpec = normalizeBucketSpec(schema, table) diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 8898a11..c39adac 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -34,7 +34,7 @@ | org.apache.spark.sql.catalyst.expressions.Ascii | ascii | SELECT ascii('222') | struct<ascii(222):int> | | org.apache.spark.sql.catalyst.expressions.Asin | asin | SELECT asin(0) | struct<ASIN(CAST(0 AS DOUBLE)):double> | | org.apache.spark.sql.catalyst.expressions.Asinh | asinh | SELECT asinh(0) | struct<ASINH(CAST(0 AS DOUBLE)):double> | -| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):null> | +| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):unknown> | | org.apache.spark.sql.catalyst.expressions.Atan | atan | SELECT atan(0) | struct<ATAN(CAST(0 AS DOUBLE)):double> | | org.apache.spark.sql.catalyst.expressions.Atan2 | atan2 | SELECT atan2(0, 0) | struct<ATAN2(CAST(0 AS DOUBLE), CAST(0 AS DOUBLE)):double> | | org.apache.spark.sql.catalyst.expressions.Atanh | atanh | SELECT atanh(0) | struct<ATANH(CAST(0 AS DOUBLE)):double> | diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out index f6720f6..0274771 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out @@ -5,7 +5,7 @@ -- !query select null, Null, nUll -- !query schema -struct<NULL:null,NULL:null,NULL:null> +struct<NULL:unknown,NULL:unknown,NULL:unknown> -- !query output NULL NULL NULL diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index 9943b93..2dd6960 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -49,7 +49,7 @@ two 2 -- !query select * from values ("one", null), ("two", null) as data(a, b) -- !query schema -struct<a:string,b:null> +struct<a:string,b:unknown> -- !query output one NULL two NULL diff --git a/sql/core/src/test/resources/sql-tests/results/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/literals.sql.out index f6720f6..0274771 100644 --- a/sql/core/src/test/resources/sql-tests/results/literals.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/literals.sql.out @@ -5,7 +5,7 @@ -- !query select null, Null, nUll -- !query schema -struct<NULL:null,NULL:null,NULL:null> +struct<NULL:unknown,NULL:unknown,NULL:unknown> -- !query output NULL NULL NULL diff --git a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out index bd8ffb8..8d34bf2 100644 --- a/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/misc-functions.sql.out @@ -7,7 +7,7 @@ select typeof(null) -- !query schema struct<typeof(NULL):string> -- !query output -null +unknown -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out index 1e59036..8b32bd6 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select.sql.out @@ -308,7 +308,7 @@ struct<1:int> -- !query select foo.* from (select null) as foo -- !query schema -struct<NULL:null> +struct<NULL:unknown> -- !query output NULL @@ -316,7 +316,7 @@ NULL -- !query select foo.* from (select 'xyzzy',1,null) as foo -- !query schema -struct<xyzzy:string,1:int,NULL:null> +struct<xyzzy:string,1:int,NULL:unknown> -- !query output xyzzy 1 NULL diff --git a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out index 26a44a8..b905f9e 100644 --- a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out @@ -5,7 +5,7 @@ -- !query SELECT ifnull(null, 'x'), ifnull('y', 'x'), ifnull(null, null) -- !query schema -struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):null> +struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):unknown> -- !query output x y NULL @@ -21,7 +21,7 @@ NULL x -- !query SELECT nvl(null, 'x'), nvl('y', 'x'), nvl(null, null) -- !query schema -struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):null> +struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):unknown> -- !query output x y NULL @@ -29,7 +29,7 @@ x y NULL -- !query SELECT nvl2(null, 'x', 'y'), nvl2('n', 'x', 'y'), nvl2(null, null, null) -- !query schema -struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):null> +struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):unknown> -- !query output y x NULL diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out index d78d347..0680a87 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out @@ -49,7 +49,7 @@ two 2 -- !query select udf(a), b from values ("one", null), ("two", null) as data(a, b) -- !query schema -struct<CAST(udf(cast(a as string)) AS STRING):string,b:null> +struct<CAST(udf(cast(a as string)) AS STRING):string,b:unknown> -- !query output one NULL two NULL diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 231a8f2..daa262d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -405,7 +405,7 @@ class FileBasedDataSourceSuite extends QueryTest "" } def errorMessage(format: String): String = { - s"$format data source does not support null data type." + s"$format data source does not support unknown data type." } withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) { withTempDir { dir => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b9c98f4..2b1eb05 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable @@ -225,6 +226,8 @@ case class RelationConversions( isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => // validation is required to be done here before relation conversion. DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema)) + // This is for CREATE TABLE .. STORED AS PARQUET/ORC AS SELECT null + assertNoNullTypeInSchema(query.schema) OptimizedCreateHiveTableAsSelectCommand( tableDesc, query, query.output.map(_.name), mode) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e8cf4ad..774fb5b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} @@ -2309,6 +2310,126 @@ class HiveDDLSuite } } + test("SPARK-20680: Spark-sql do not support for unknown column datatype") { + withTable("t") { + withView("tabUnknownType") { + hiveClient.runSqlHive("CREATE TABLE t (t1 int)") + hiveClient.runSqlHive("INSERT INTO t VALUES (3)") + hiveClient.runSqlHive("CREATE VIEW tabUnknownType AS SELECT NULL AS col FROM t") + checkAnswer(spark.table("tabUnknownType"), Row(null)) + // No exception shows + val desc = spark.sql("DESC tabUnknownType").collect().toSeq + assert(desc.contains(Row("col", NullType.simpleString, null))) + } + } + + // Forbid CTAS with unknown type + withTable("t1", "t2", "t3") { + val e1 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t1 USING PARQUET AS SELECT null as null_col") + }.getMessage + assert(e1.contains("Cannot create tables with unknown type")) + + val e2 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t2 AS SELECT null as null_col") + }.getMessage + assert(e2.contains("Cannot create tables with unknown type")) + + val e3 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t3 STORED AS PARQUET AS SELECT null as null_col") + }.getMessage + assert(e3.contains("Cannot create tables with unknown type")) + } + + // Forbid Replace table AS SELECT with unknown type + withTable("t") { + val v2Source = classOf[FakeV2Provider].getName + val e = intercept[AnalysisException] { + spark.sql(s"CREATE OR REPLACE TABLE t USING $v2Source AS SELECT null as null_col") + }.getMessage + assert(e.contains("Cannot create tables with unknown type")) + } + + // Forbid creating table with VOID type in Spark + withTable("t1", "t2", "t3", "t4") { + val e1 = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE t1 (v VOID) USING PARQUET") + }.getMessage + assert(e1.contains("Cannot create tables with unknown type")) + val e2 = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE t2 (v VOID) USING hive") + }.getMessage + assert(e2.contains("Cannot create tables with unknown type")) + val e3 = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE t3 (v VOID)") + }.getMessage + assert(e3.contains("Cannot create tables with unknown type")) + val e4 = intercept[AnalysisException] { + spark.sql(s"CREATE TABLE t4 (v VOID) STORED AS PARQUET") + }.getMessage + assert(e4.contains("Cannot create tables with unknown type")) + } + + // Forbid Replace table with VOID type + withTable("t") { + val v2Source = classOf[FakeV2Provider].getName + val e = intercept[AnalysisException] { + spark.sql(s"CREATE OR REPLACE TABLE t (v VOID) USING $v2Source") + }.getMessage + assert(e.contains("Cannot create tables with unknown type")) + } + + // Make sure spark.catalog.createTable with null type will fail + val schema1 = new StructType().add("c", NullType) + assertHiveTableNullType(schema1) + assertDSTableNullType(schema1) + + val schema2 = new StructType() + .add("c", StructType(Seq(StructField.apply("c1", NullType)))) + assertHiveTableNullType(schema2) + assertDSTableNullType(schema2) + + val schema3 = new StructType().add("c", ArrayType(NullType)) + assertHiveTableNullType(schema3) + assertDSTableNullType(schema3) + + val schema4 = new StructType() + .add("c", MapType(StringType, NullType)) + assertHiveTableNullType(schema4) + assertDSTableNullType(schema4) + + val schema5 = new StructType() + .add("c", MapType(NullType, StringType)) + assertHiveTableNullType(schema5) + assertDSTableNullType(schema5) + } + + private def assertHiveTableNullType(schema: StructType): Unit = { + withTable("t") { + val e = intercept[AnalysisException] { + spark.catalog.createTable( + tableName = "t", + source = "hive", + schema = schema, + options = Map("fileFormat" -> "parquet")) + }.getMessage + assert(e.contains("Cannot create tables with unknown type")) + } + } + + private def assertDSTableNullType(schema: StructType): Unit = { + withTable("t") { + val e = intercept[AnalysisException] { + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = schema, + options = Map.empty[String, String]) + }.getMessage + assert(e.contains("Cannot create tables with unknown type")) + } + } + test("SPARK-21216: join with a streaming DataFrame") { import org.apache.spark.sql.execution.streaming.MemoryStream import testImplicits._ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala index 91fd8a4..61c48c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -121,7 +121,7 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { msg = intercept[AnalysisException] { sql("select null").write.mode("overwrite").orc(orcDir) }.getMessage - assert(msg.contains("ORC data source does not support null data type.")) + assert(msg.contains("ORC data source does not support unknown data type.")) msg = intercept[AnalysisException] { spark.udf.register("testType", () => new IntervalData()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org