This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 50c163578cf Revert "[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values" 50c163578cf is described below commit 50c163578cfef79002fbdbc54b3b8fc10cfbcf65 Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Thu Aug 18 14:37:38 2022 -0700 Revert "[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values" ### What changes were proposed in this pull request? Revert PR 37430 in commit 13c1b594a8ee6b99544572864b378b3616ffdb58 (Update INSERTs without user-specified fields to not automatically add default values). This is a clean revert, undoing the original changes from that PR exactly. ### Why are the changes needed? Upon further review, we find that the ability to switch between the new configuration and the old one is not needed and adds complexity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test coverage.1 Closes #37572 from dtenedor/revert-pr37430. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- docs/sql-migration-guide.md | 1 - .../catalyst/analysis/ResolveDefaultColumns.scala | 42 ++-- .../org/apache/spark/sql/internal/SQLConf.scala | 29 ++- .../org/apache/spark/sql/SQLInsertTestSuite.scala | 57 +++-- .../org/apache/spark/sql/sources/InsertSuite.scala | 231 ++++++++++----------- .../org/apache/spark/sql/hive/InsertSuite.scala | 12 +- 6 files changed, 178 insertions(+), 194 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index c19069cfdba..42df05f7f70 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -26,7 +26,6 @@ license: | - Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed. - Since Spark 3.4, v1 database, table, permanent view and function identifier will include 'spark_catalog' as the catalog name if database is defined, e.g. a table identifier will be: `spark_catalog.default.t`. To restore the legacy behavior, set `spark.sql.legacy.v1IdentifierNoCatalog` to `true`. - - Since Spark 3.4, `INSERT INTO` commands will now support user-specified column lists comprising fewer columns than present in the target table (for example, `INSERT INTO t (a, b) VALUES (1, 2)` where table `t` has three columns). In this case, Spark will insert `NULL` into the remaining columns in the row, or the explicit `DEFAULT` value if assigned to the column. To revert to the previous behavior, please set `spark.sql.defaultColumn.addMissingValuesForInsertsWithExplicitColumns` to false. - Since Spark 3.4, when ANSI SQL mode(configuration `spark.sql.ansi.enabled`) is on, Spark SQL always returns NULL result on getting a map value with a non-existing key. In Spark 3.3 or earlier, there will be an error. ## Upgrading from Spark SQL 3.2 to 3.3 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala index 20ca3c9532a..b7c7f0d3772 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala @@ -108,7 +108,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema) val expanded: LogicalPlan = - addMissingDefaultValuesForInsertFromInlineTable(node, schema, i.userSpecifiedCols.length) + addMissingDefaultValuesForInsertFromInlineTable(node, schema) val replaced: Option[LogicalPlan] = replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded) replaced.map { r: LogicalPlan => @@ -132,7 +132,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema) val project: Project = i.query.asInstanceOf[Project] val expanded: Project = - addMissingDefaultValuesForInsertFromProject(project, schema, i.userSpecifiedCols.length) + addMissingDefaultValuesForInsertFromProject(project, schema) val replaced: Option[LogicalPlan] = replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded) replaced.map { r => @@ -265,15 +265,14 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl */ private def addMissingDefaultValuesForInsertFromInlineTable( node: LogicalPlan, - insertTableSchemaWithoutPartitionColumns: StructType, - numUserSpecifiedFields: Int): LogicalPlan = { + insertTableSchemaWithoutPartitionColumns: StructType): LogicalPlan = { val numQueryOutputs: Int = node match { case table: UnresolvedInlineTable => table.rows(0).size case local: LocalRelation => local.data(0).numFields } val schema = insertTableSchemaWithoutPartitionColumns val newDefaultExpressions: Seq[Expression] = - getDefaultExpressionsForInsert(numQueryOutputs, schema, numUserSpecifiedFields, node) + getDefaultExpressionsForInsert(numQueryOutputs, schema) val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map { _.name } node match { case _ if newDefaultExpressions.isEmpty => node @@ -299,12 +298,11 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl */ private def addMissingDefaultValuesForInsertFromProject( project: Project, - insertTableSchemaWithoutPartitionColumns: StructType, - numUserSpecifiedFields: Int): Project = { + insertTableSchemaWithoutPartitionColumns: StructType): Project = { val numQueryOutputs: Int = project.projectList.size val schema = insertTableSchemaWithoutPartitionColumns val newDefaultExpressions: Seq[Expression] = - getDefaultExpressionsForInsert(numQueryOutputs, schema, numUserSpecifiedFields, project) + getDefaultExpressionsForInsert(numQueryOutputs, schema) val newAliases: Seq[NamedExpression] = newDefaultExpressions.zip(schema.fields).map { case (expr, field) => Alias(expr, field.name)() @@ -317,19 +315,20 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl */ private def getDefaultExpressionsForInsert( numQueryOutputs: Int, - schema: StructType, - numUserSpecifiedFields: Int, - treeNode: LogicalPlan): Seq[Expression] = { - if (numUserSpecifiedFields > 0 && numUserSpecifiedFields != numQueryOutputs) { - throw QueryCompilationErrors.writeTableWithMismatchedColumnsError( - numUserSpecifiedFields, numQueryOutputs, treeNode) - } - if (numUserSpecifiedFields > 0 && SQLConf.get.addMissingValuesForInsertsWithExplicitColumns) { - val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs) - val numDefaultExpressionsToAdd = remainingFields.size - Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME)) + schema: StructType): Seq[Expression] = { + val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs) + val numDefaultExpressionsToAdd = getStructFieldsForDefaultExpressions(remainingFields).size + Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME)) + } + + /** + * This is a helper for the getDefaultExpressionsForInsert methods above. + */ + private def getStructFieldsForDefaultExpressions(fields: Seq[StructField]): Seq[StructField] = { + if (SQLConf.get.useNullsForMissingDefaultColumnValues) { + fields } else { - Seq.empty[Expression] + fields.takeWhile(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) } } @@ -488,7 +487,8 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl schema.fields.filter { field => !userSpecifiedColNames.contains(field.name) } - Some(StructType(userSpecifiedFields ++ nonUserSpecifiedFields)) + Some(StructType(userSpecifiedFields ++ + getStructFieldsForDefaultExpressions(nonUserSpecifiedFields))) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cd0c0d25053..3ce6ee47958 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2924,18 +2924,6 @@ object SQLConf { .stringConf .createWithDefault("csv,json,orc,parquet") - val ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS = - buildConf("spark.sql.defaultColumn.addMissingValuesForInsertsWithExplicitColumns") - .internal() - .doc("When true, allow INSERT INTO commands with explicit columns (such as " + - "INSERT INTO t(a, b)) to specify fewer columns than the target table; the analyzer will " + - "assign default values for remaining columns (either NULL, or otherwise the explicit " + - "DEFAULT value associated with the column from a previous command). Otherwise, if " + - "false, return an error.") - .version("3.4.0") - .booleanConf - .createWithDefault(true) - val JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE = buildConf("spark.sql.jsonGenerator.writeNullIfWithDefaultValue") .internal() @@ -2948,6 +2936,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES = + buildConf("spark.sql.defaultColumn.useNullsForMissingDefaultValues") + .internal() + .doc("When true, and DEFAULT columns are enabled, allow column definitions lacking " + + "explicit default values to behave as if they had specified DEFAULT NULL instead. " + + "For example, this allows most INSERT INTO statements to specify only a prefix of the " + + "columns in the target table, and the remaining columns will receive NULL values.") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val ENFORCE_RESERVED_KEYWORDS = buildConf("spark.sql.ansi.enforceReservedKeywords") .doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser enforces the ANSI " + "reserved keywords and forbids SQL queries that use reserved keywords as alias names " + @@ -4531,12 +4530,12 @@ class SQLConf extends Serializable with Logging { def defaultColumnAllowedProviders: String = getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS) - def addMissingValuesForInsertsWithExplicitColumns: Boolean = - getConf(SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS) - def jsonWriteNullIfWithDefaultValue: Boolean = getConf(JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE) + def useNullsForMissingDefaultColumnValues: Boolean = + getConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES) + def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS) def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 9d5f75bad38..7fd6a5dbea0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -175,48 +175,45 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { test("insert with column list - mismatched column list size") { val msgs = Seq("Cannot write to table due to mismatched user specified column size", "expected 3 columns but found") - withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false", - SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") { + def test: Unit = { withTable("t1") { val cols = Seq("c1", "c2", "c3") createTable("t1", cols, Seq("int", "long", "string")) - Seq( - "INSERT INTO t1 (c1, c2) values(1, 2, 3)", - "INSERT INTO t1 (c1, c2) select 1, 2, 3", - "INSERT INTO t1 (c1, c2, c3) values(1, 2)", - "INSERT INTO t1 (c1, c2, c3) select 1, 2" - ).foreach { query => - val e = intercept[AnalysisException](sql(query)) - assert(e.getMessage.contains(msgs(0)) || e.getMessage.contains(msgs(1))) - } + val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)")) + assert(e1.getMessage.contains(msgs(0)) || e1.getMessage.contains(msgs(1))) + val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)")) + assert(e2.getMessage.contains(msgs(0)) || e2.getMessage.contains(msgs(1))) } } + withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { + test + } + withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") { + test + } } test("insert with column list - mismatched target table out size after rewritten query") { - val v2Msg = "Cannot write to table due to mismatched user specified column size" + val v2Msg = "expected 2 columns but found" val cols = Seq("c1", "c2", "c3", "c4") - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "false") { - withTable("t1") { - createTable("t1", cols, Seq.fill(4)("int")) - val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)")) - assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") || - e1.getMessage.contains("expected 4 columns but found 1") || - e1.getMessage.contains("not enough data columns") || - e1.getMessage.contains(v2Msg)) - } + withTable("t1") { + createTable("t1", cols, Seq.fill(4)("int")) + val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)")) + assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") || + e1.getMessage.contains("expected 4 columns but found 1") || + e1.getMessage.contains("not enough data columns") || + e1.getMessage.contains(v2Msg)) + } - withTable("t1") { - createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2)) - val e1 = intercept[AnalysisException] { - sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)") - } - assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") || - e1.getMessage.contains("not enough data columns") || - e1.getMessage.contains(v2Msg)) + withTable("t1") { + createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2)) + val e1 = intercept[AnalysisException] { + sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)") } + assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") || + e1.getMessage.contains("not enough data columns") || + e1.getMessage.contains(v2Msg)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index afe3ac6facc..3936f2b995c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.DataSourceUtils @@ -864,8 +863,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } test("Allow user to insert specified columns into insertable view") { - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") { + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { + sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + (1 to 10).map(i => Row(i, null)) + ) + sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt") checkAnswer( sql("SELECT a, b FROM jsonTable"), @@ -878,9 +882,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { (1 to 10).map(i => Row(null, s"str$i")) ) } + + val message = intercept[AnalysisException] { + sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt") + }.getMessage + assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)")) } test("SPARK-38336 INSERT INTO statements with tables with default columns: positive tests") { + // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no + // explicit DEFAULT value is available when the INSERT INTO statement provides fewer + // values than expected, NULL values are appended in their place. + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { + withTable("t") { + sql("create table t(i boolean, s bigint) using parquet") + sql("insert into t values(true)") + checkAnswer(spark.table("t"), Row(true, null)) + } + } // The default value for the DEFAULT keyword is the NULL literal. withTable("t") { sql("create table t(i boolean, s bigint) using parquet") @@ -896,13 +915,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // The default value parses correctly and the provided value type is different but coercible. withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet") - sql("insert into t (i) values(false)") + sql("insert into t values(false)") checkAnswer(spark.table("t"), Row(false, 42L)) } // There are two trailing default values referenced implicitly by the INSERT INTO statement. withTable("t") { sql("create table t(i int, s bigint default 42, x bigint default 43) using parquet") - sql("insert into t(i) values(1)") + sql("insert into t values(1)") checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i => Row(i))) } // The table has a partitioning column and a default value is injected. @@ -977,47 +996,37 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { checkAnswer(spark.table("t"), Row(false, 42L)) } // There are three column types exercising various combinations of implicit and explicit - // default column value references in the 'insert into' statements. - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") { + // default column value references in the 'insert into' statements. Note these tests depend on + // enabling the configuration to use NULLs for missing DEFAULT column values. + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { for (useDataFrames <- Seq(false, true)) { withTable("t1", "t2") { sql("create table t1(j int, s bigint default 42, x bigint default 43) using parquet") if (useDataFrames) { - Seq((1, 42, 43)).toDF.write.insertInto("t1") - Seq((2, 42, 43)).toDF.write.insertInto("t1") - Seq((3, 42, 43)).toDF.write.insertInto("t1") - Seq((4, 44, 43)).toDF.write.insertInto("t1") + Seq((1)).toDF.write.insertInto("t1") + Seq((2)).toDF.write.insertInto("t1") + Seq((3)).toDF.write.insertInto("t1") + Seq((4, 44)).toDF.write.insertInto("t1") Seq((5, 44, 45)).toDF.write.insertInto("t1") } else { - sql("insert into t1(j) values(1)") - sql("insert into t1(j, s) values(2, default)") + sql("insert into t1 values(1)") + sql("insert into t1 values(2, default)") sql("insert into t1 values(3, default, default)") - sql("insert into t1(j, s) values(4, 44)") + sql("insert into t1 values(4, 44)") sql("insert into t1 values(5, 44, 45)") } sql("create table t2(j int, s bigint default 42, x bigint default 43) using parquet") if (useDataFrames) { - spark.table("t1").where("j = 1").select("j") - .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43)))) - .write.insertInto("t2") - spark.table("t1").where("j = 2").select("j") - .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43)))) - .write.insertInto("t2") - spark.table("t1").where("j = 3").select("j") - .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43)))) - .write.insertInto("t2") - spark.table("t1").where("j = 4").select("j", "s") - .withColumn("x", Column(Literal(43))) - .write.insertInto("t2") - spark.table("t1").where("j = 5").select("j", "s") - .withColumn("x", Column(Literal(43))) - .write.insertInto("t2") + spark.table("t1").where("j = 1").select("j").write.insertInto("t2") + spark.table("t1").where("j = 2").select("j").write.insertInto("t2") + spark.table("t1").where("j = 3").select("j").write.insertInto("t2") + spark.table("t1").where("j = 4").select("j", "s").write.insertInto("t2") + spark.table("t1").where("j = 5").select("j", "s").write.insertInto("t2") } else { - sql("insert into t2(j) select j from t1 where j = 1") - sql("insert into t2(j, s) select j, default from t1 where j = 2") + sql("insert into t2 select j from t1 where j = 1") + sql("insert into t2 select j, default from t1 where j = 2") sql("insert into t2 select j, default, default from t1 where j = 3") - sql("insert into t2(j, s) select j, s from t1 where j = 4") + sql("insert into t2 select j, s from t1 where j = 4") sql("insert into t2 select j, s, default from t1 where j = 5") } checkAnswer( @@ -1119,21 +1128,20 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("create table t(i int, s bigint default 42, x bigint) using parquet") assert(intercept[AnalysisException] { sql("insert into t values(1)") - }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)")) + }.getMessage.contains("expected 3 columns but found")) } // The table has a partitioning column with a default value; this is not allowed. withTable("t") { sql("create table t(i boolean default true, s bigint, q int default 42) " + "using parquet partitioned by (i)") assert(intercept[ParseException] { - sql("insert into t partition(i=default) (s, q) values(5, default)") + sql("insert into t partition(i=default) values(5, default)") }.getMessage.contains( "References to DEFAULT column values are not allowed within the PARTITION clause")) } - // The INSERT INTO statement has no user specified columns and fewer values than the number of - // columns in the target table. - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") { + // The configuration option to append missing NULL values to the end of the INSERT INTO + // statement is not enabled. + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { withTable("t") { sql("create table t(i boolean, s bigint) using parquet") assert(intercept[AnalysisException] { @@ -1171,16 +1179,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { Row(4, 43, false), Row(4, 42, false))) } - // When the INSERT INTO statement has user specified columns, and no explicit DEFAULT value is - // available when the INSERT INTO statement provides fewer values than expected, NULL values are - // appended in their place. - withTable("t") { - sql("create table t(i boolean, s bigint) using parquet") - sql("insert into t (i) values (true)") - checkAnswer(spark.table("t"), Row(true, null)) + // When the CASE_SENSITIVE configuration is disabled, then using different cases for the + // required and provided column names is successful. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + withTable("t") { + sql("create table t(i boolean, s bigint default 42, q int default 43) using parquet") + sql("insert into t (I, Q) select true from (select 1)") + checkAnswer(spark.table("t"), Row(true, 42L, 43)) + } } - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") { + // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no + // explicit DEFAULT value is available when the INSERT INTO statement provides fewer + // values than expected, NULL values are appended in their place. + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { + withTable("t") { + sql("create table t(i boolean, s bigint) using parquet") + sql("insert into t (i) values (true)") + checkAnswer(spark.table("t"), Row(true, null)) + } withTable("t") { sql("create table t(i boolean default true, s bigint) using parquet") sql("insert into t (i) values (default)") @@ -1211,71 +1227,65 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(i boolean, s bigint) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (true)") + sql("insert into t (i) values (true)") }.getMessage.contains(addOneColButExpectedTwo)) } withTable("t") { sql("create table t(i boolean default true, s bigint) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (default)") + sql("insert into t (i) values (default)") }.getMessage.contains(addOneColButExpectedTwo)) } withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (default)") + sql("insert into t (s) values (default)") }.getMessage.contains(addOneColButExpectedTwo)) } withTable("t") { sql("create table t(i boolean, s bigint, q int default 43) using parquet") assert(intercept[AnalysisException] { - sql("insert into t select true from (select 1)") - }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)")) - } - withTable("t") { - sql("create table t(i boolean default true, s bigint default 42) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t values (default)") - }.getMessage.contains("target table has 2 column(s) but the inserted data has 1 column(s)")) + sql("insert into t (i, q) select true from (select 1)") + }.getMessage.contains(addTwoColButExpectedThree)) } - // When no explicit DEFAULT value is available when the INSERT INTO statement provides fewer + // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no + // explicit DEFAULT value is available when the INSERT INTO statement provides fewer // values than expected, the INSERT INTO command fails to execute. - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") { + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { withTable("t") { sql("create table t(i boolean, s bigint) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (true)") + sql("insert into t (i) values (true)") }.getMessage.contains(addOneColButExpectedTwo)) } withTable("t") { sql("create table t(i boolean default true, s bigint) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (default)") + sql("insert into t (i) values (default)") }.getMessage.contains(addOneColButExpectedTwo)) } withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet") assert(intercept[AnalysisException] { - sql("insert into t values (default)") + sql("insert into t (s) values (default)") }.getMessage.contains(addOneColButExpectedTwo)) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") assert(intercept[AnalysisException] { - sql("insert into t partition(i='true') values(5)") + sql("insert into t partition(i='true') (s) values(5)") }.getMessage.contains(addTwoColButExpectedThree)) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") assert(intercept[AnalysisException] { - sql("insert into t partition(i='false') select 43") + sql("insert into t partition(i='false') (q) select 43") }.getMessage.contains(addTwoColButExpectedThree)) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") assert(intercept[AnalysisException] { - sql("insert into t partition(i='false') select default") + sql("insert into t partition(i='false') (q) select default") }.getMessage.contains(addTwoColButExpectedThree)) } } @@ -1286,18 +1296,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("create table t(i boolean default true, s bigint default 42) using parquet") assert(intercept[AnalysisException] { sql("insert into t (I) select true from (select 1)") - }.getMessage.contains("A column or function parameter with name `I` cannot be resolved")) - } - } - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "false") { - withTable("t") { - sql("create table t(i boolean, s bigint default 42) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t(i) values (default)") }.getMessage.contains( - "Cannot write to table due to mismatched user specified column size(2) " + - "and data column size(1)")) + "[UNRESOLVED_COLUMN] A column or function parameter with name `I` cannot be resolved. " + + "Did you mean one of the following? [`i`, `s`]")) } } } @@ -1317,14 +1318,14 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql(createTableIntCol) sql("alter table t add column s bigint default 42") sql("alter table t add column x bigint default 43") - sql("insert into t (i) values(1)") + sql("insert into t values(1)") checkAnswer(spark.table("t"), Row(1, 42, 43)) } // There are two trailing default values referenced implicitly by the INSERT INTO statement. withTable("t") { sql(createTableIntCol) sql("alter table t add columns s bigint default 42, x bigint default 43") - sql("insert into t (i) values(1)") + sql("insert into t values(1)") checkAnswer(spark.table("t"), Row(1, 42, 43)) } // The table has a partitioning column and a default value is injected. @@ -1382,24 +1383,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { checkAnswer(spark.table("t"), Row(false, 1)) } // There are three column types exercising various combinations of implicit and explicit - // default column value references in the 'insert into' statements. - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") { + // default column value references in the 'insert into' statements. Note these tests depend on + // enabling the configuration to use NULLs for missing DEFAULT column values. + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { withTable("t1", "t2") { sql("create table t1(j int) using parquet") sql("alter table t1 add column s bigint default 42") sql("alter table t1 add column x bigint default 43") - sql("insert into t1(j) values(1)") - sql("insert into t1(j, s) values(2, default)") + sql("insert into t1 values(1)") + sql("insert into t1 values(2, default)") sql("insert into t1 values(3, default, default)") - sql("insert into t1(j, s) values(4, 44)") + sql("insert into t1 values(4, 44)") sql("insert into t1 values(5, 44, 45)") sql("create table t2(j int) using parquet") sql("alter table t2 add columns s bigint default 42, x bigint default 43") - sql("insert into t2(j) select j from t1 where j = 1") - sql("insert into t2(j, s) select j, default from t1 where j = 2") + sql("insert into t2 select j from t1 where j = 1") + sql("insert into t2 select j, default from t1 where j = 2") sql("insert into t2 select j, default, default from t1 where j = 3") - sql("insert into t2(j, s) select j, s from t1 where j = 4") + sql("insert into t2 select j, s from t1 where j = 4") sql("insert into t2 select j, s, default from t1 where j = 5") checkAnswer( spark.table("t2"), @@ -1462,7 +1463,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("alter table t add column x bigint") assert(intercept[AnalysisException] { sql("insert into t values(1)") - }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)")) + }.getMessage.contains("expected 3 columns but found")) } } @@ -1538,13 +1539,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { insertNullsToStorage: Boolean = true, useDataFrames: Boolean = false) def runTest(dataSource: String, config: Config): Unit = { - def withTableT(f: => Unit): Unit = { - sql(s"create table t(a string, i int) using $dataSource") + def insertIntoT(): Unit = { if (config.useDataFrames) { Seq(("xyz", 42)).toDF.write.insertInto("t") } else { - sql("insert into t (a, i) values('xyz', 42)") + sql("insert into t values('xyz', 42)") } + } + def withTableT(f: => Unit): Unit = { + sql(s"create table t(a string, i int) using $dataSource") + insertIntoT withTable("t") { f } } // Positive tests: @@ -1568,7 +1572,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { if (config.useDataFrames) { Seq((null, null, null)).toDF.write.insertInto("t") } else { - sql("insert into t (a, i, s) values(null, null, null)") + sql("insert into t values(null, null, null)") } sql("alter table t add column (x boolean default true)") // By default, INSERT commands into some tables (such as JSON) do not store NULL values. @@ -1609,17 +1613,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { "a3 bigint default 43," + "a4 smallint default cast(5 as smallint)," + "a5 tinyint default cast(6 as tinyint))") - if (config.useDataFrames) { - sql("select 'xyz', 42, true, cast(null as byte), cast(42 as short), 0, 0, " + - "cast('2021-01-02' as date), " + - "cast('2021-01-02 01:01:01' as timestamp), " + - "cast('2021-01-02 01:01:01' as timestamp_ntz), " + - "cast('2021-01-02 01:01:01' as timestamp_ltz), " + - "cast(123.45 as decimal(5, 2)), 43, cast(5 as smallint), cast(6 as tinyint)") - .write.insertInto("t") - } else { - sql("insert into t (a, i) values('xyz', 42)") - } + insertIntoT() // Manually inspect the result row values rather than using the 'checkAnswer' helper method // in order to ensure the values' correctness while avoiding minor type incompatibilities. val result: Array[Row] = @@ -1891,15 +1885,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { array( map(false, 'def', true, 'jkl')))) using ${config.dataSource}""") - def namedStructSql(colA: Integer, colB: Integer, key1: String, key2: String): String = { - "named_struct(" + - "'x', array(" + - s"named_struct('a', $colA, 'b', $colB)), " + - "'y', array(" + - s"map(false, '$key1', true, '$key2')))" - } if (config.useDataFrames) { - sql("select 1, " + namedStructSql(1, 2, "def", "jkl")).write.insertInto("t") + Seq((1)).toDF.write.insertInto("t") } else { sql("insert into t select 1, default") } @@ -1918,7 +1905,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { array( map(false, 'mno', true, 'pqr')))""") if (config.useDataFrames) { - sql("select 3, " + namedStructSql(3, 4, "mno", "pqr")).write.insertInto("t") + Seq((3)).toDF.write.insertInto("t") } else { sql("insert into t select 3, default") } @@ -1930,12 +1917,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { default array( map(true, 'xyz'))""") if (config.useDataFrames) { - sql("select 4, " + namedStructSql(3, 4, "mno", "pqr") + "," + - "array(" + - "map(true, 'xyz'))") - .write.insertInto("t") + Seq((4)).toDF.write.insertInto("t") } else { - sql("insert into t(i, s) select 4, default") + sql("insert into t select 4, default") } checkAnswer(spark.table("t"), Seq( @@ -2256,13 +2240,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { checkAnswer(spark.table("t1"), Row(1, "str1")) } - withSQLConf( - SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") { + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { withTable("t1") { sql("CREATE TABLE t1(c1 int, c2 string, c3 int) using parquet") - sql("INSERT INTO TABLE t1 (c1, c2) select * from jt where a=1") + sql("INSERT INTO TABLE t1 select * from jt where a=1") checkAnswer(spark.table("t1"), Row(1, "str1", null)) - sql("INSERT INTO TABLE t1 (c1, c2, c3) select *, 2 from jt where a=2") + sql("INSERT INTO TABLE t1 select *, 2 from jt where a=2") checkAnswer(spark.table("t1"), Seq(Row(1, "str1", null), Row(2, "str2", 2))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index cfcc6b6b6b4..a7148e9c921 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -906,9 +906,15 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } } } - - withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false", - SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") { + withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { + testDefaultColumn + } + withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true", + SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + testDefaultColumn + } + withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true", + SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { testDefaultColumn } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org