This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ce646b3dac93 [SPARK-52991][SQL] Implement MERGE INTO with SCHEMA EVOLUTION for V2 Data Source ce646b3dac93 is described below commit ce646b3dac93b5f87ccfcc48a032228ef4dcdec7 Author: Szehon Ho <szehon.apa...@gmail.com> AuthorDate: Sat Aug 23 01:15:06 2025 +0800 [SPARK-52991][SQL] Implement MERGE INTO with SCHEMA EVOLUTION for V2 Data Source ### What changes were proposed in this pull request? Add support for schema evolution for data source that support MERGE INTO, currently V2 DataSources. This means that if the SOURCE table of merge has a different schema than TARGET table, the TARGET table can automatically update to take into account the new or different fields. The basic idea is to add - TableCapability.MERGE_SCHEMA_EVOLUTION to indicate DSV2 table wants Spark to handle schema evolution for MERGE - ResolveMergeIntoSchemaEvolution rule, will generate DSV2 TableChanges and calls Catalog.alterTable For any new field in the top level or in a nested struct, Spark will add the field to the end. TODOS: 1. this currently does not support the case where SOURCE has a missing nested field from TARGET, and there is a UPDATE or INSERT star. Example: ``` MERGE INTO TARGET t USING SOURCE s // s=struct('a', struct('b': Int)) // t = struct('a', struct('c', int)) ``` will only work if the user specifies a value explicitly for the new nested field t.b for INSERT and UPDATE, ie ``` INSERT (s) VALUES (nested_struct('a', nested_struct('b', 1, 'c' 2))) UPDATE SET a.b = 2 ``` and not if they use INSERT * or UPDATE SET *. 2. Type widening is not allowed for the moment, as we need to decide what widenings to allow We can take this in a follow on pr. ### Why are the changes needed? https://github.com/apache/spark/pull/45748 added the syntax 'WITH SCHEMA EVOLUTION' to 'MERGE INTO'. However, this requires some external Spark extension to resolve Merge, and doesnt do anything in Spark's native MERGE implementation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added many tests to MergeIntoTableSuiteBase ### Was this patch authored or co-authored using generative AI tooling? No Closes #51698 from szehon-ho/merge_schema_evolution. Authored-by: Szehon Ho <szehon.apa...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/resources/error/error-conditions.json | 6 + .../sql/connector/catalog/TableCapability.java | 6 + .../spark/sql/catalyst/analysis/Analyzer.scala | 21 +- .../analysis/ResolveMergeIntoSchemaEvolution.scala | 65 +++ .../ResolveRowLevelCommandAssignments.scala | 6 +- .../catalyst/analysis/RewriteMergeIntoTable.scala | 10 +- .../sql/catalyst/plans/logical/v2Commands.scala | 78 ++- .../spark/sql/errors/QueryCompilationErrors.scala | 9 + .../datasources/v2/DataSourceV2Relation.scala | 3 + .../sql/connector/catalog/InMemoryBaseTable.scala | 14 +- .../sql/connector/MergeIntoTableSuiteBase.scala | 637 ++++++++++++++++++++- .../execution/command/PlanResolutionSuite.scala | 46 +- 12 files changed, 868 insertions(+), 33 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index a7105e228e2b..07725ea7e0ee 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6731,6 +6731,12 @@ }, "sqlState" : "0A000" }, + "UNSUPPORTED_TABLE_CHANGE_IN_AUTO_SCHEMA_EVOLUTION" : { + "message" : [ + "The table changes <changes> are not supported by the catalog on table <tableName>." + ], + "sqlState" : "42000" + }, "UNSUPPORTED_TABLE_CHANGE_IN_JDBC_CATALOG" : { "message" : [ "The table change <change> is not supported for the JDBC catalog on table <tableName>. Supported changes include: AddColumn, RenameColumn, DeleteColumn, UpdateColumnType, UpdateColumnNullability." diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java index 5732c0f3af4e..0a01c0c266b9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java @@ -93,6 +93,12 @@ public enum TableCapability { */ ACCEPT_ANY_SCHEMA, + /** + * Signals that table supports Spark altering the schema if necessary + * as part of an operation. + */ + AUTOMATIC_SCHEMA_EVOLUTION, + /** * Signals that the table supports append writes using the V1 InsertableRelation interface. * <p> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ccac4d8c4c0d..b25e4d5d538f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -444,6 +444,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor AddMetadataColumns :: DeduplicateRelations :: ResolveCollationName :: + ResolveMergeIntoSchemaEvolution :: new ResolveReferences(catalogManager) :: // Please do not insert any other rules in between. See the TODO comments in rule // ResolveLateralColumnAliasReference for more details. @@ -1669,7 +1670,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case u: UpdateTable => resolveReferencesInUpdate(u) case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _, _) - if !m.resolved && targetTable.resolved && sourceTable.resolved => + if !m.resolved && targetTable.resolved && sourceTable.resolved && !m.needSchemaEvolution => EliminateSubqueryAliases(targetTable) match { case r: NamedRelation if r.skipSchemaResolution => @@ -1692,9 +1693,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // The update value can access columns from both target and source tables. resolveAssignments(assignments, m, MergeResolvePolicy.BOTH)) case UpdateStarAction(updateCondition) => - val assignments = targetTable.output.map { attr => - Assignment(attr, UnresolvedAttribute(Seq(attr.name))) - } + // Use only source columns. Missing columns in target will be handled in + // ResolveRowLevelCommandAssignments. + val assignments = targetTable.output.flatMap{ targetAttr => + sourceTable.output.find( + sourceCol => conf.resolver(sourceCol.name, targetAttr.name)) + .map(Assignment(targetAttr, _))} UpdateAction( updateCondition.map(resolveExpressionByPlanChildren(_, m)), // For UPDATE *, the value must be from source table. @@ -1715,9 +1719,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // access columns from the source table. val resolvedInsertCondition = insertCondition.map( resolveExpressionByPlanOutput(_, m.sourceTable)) - val assignments = targetTable.output.map { attr => - Assignment(attr, UnresolvedAttribute(Seq(attr.name))) - } + // Use only source columns. Missing columns in target will be handled in + // ResolveRowLevelCommandAssignments. + val assignments = targetTable.output.flatMap{ targetAttr => + sourceTable.output.find( + sourceCol => conf.resolver(sourceCol.name, targetAttr.name)) + .map(Assignment(targetAttr, _))} InsertAction( resolvedInsertCondition, resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala new file mode 100644 index 000000000000..7e7776098a04 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.types.DataTypeUtils +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + + +/** + * A rule that resolves schema evolution for MERGE INTO. + * + * This rule will call the DSV2 Catalog to update the schema of the target table. + */ +object ResolveMergeIntoSchemaEvolution extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case m @ MergeIntoTable(_, _, _, _, _, _, _) + if m.needSchemaEvolution => + val newTarget = m.targetTable.transform { + case r : DataSourceV2Relation => performSchemaEvolution(r, m.sourceTable) + } + m.copy(targetTable = newTarget) + } + + private def performSchemaEvolution(relation: DataSourceV2Relation, source: LogicalPlan) + : DataSourceV2Relation = { + (relation.catalog, relation.identifier) match { + case (Some(c: TableCatalog), Some(i)) => + val changes = MergeIntoTable.schemaChanges(relation.schema, source.schema) + c.alterTable(i, changes: _*) + val newTable = c.loadTable(i) + val newSchema = CatalogV2Util.v2ColumnsToStructType(newTable.columns()) + // Check if there are any remaining changes not applied. + val remainingChanges = MergeIntoTable.schemaChanges(newSchema, source.schema) + if (remainingChanges.nonEmpty) { + throw QueryCompilationErrors.unsupportedTableChangesInAutoSchemaEvolutionError( + remainingChanges, i.toQualifiedNameParts(c)) + } + relation.copy(table = newTable, output = DataTypeUtils.toAttributes(newSchema)) + case _ => logWarning(s"Schema Evolution enabled but data source $relation " + + s"does not support it, skipping.") + relation + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala index 3f3e707b054b..83520b780f12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala @@ -48,7 +48,8 @@ object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] { case u: UpdateTable if !u.skipSchemaResolution && u.resolved && !u.aligned => resolveAssignments(u) - case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && m.rewritable && !m.aligned => + case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && m.rewritable && !m.aligned && + !m.needSchemaEvolution => validateStoreAssignmentPolicy() m.copy( targetTable = cleanAttrMetadata(m.targetTable), @@ -56,7 +57,8 @@ object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] { notMatchedActions = alignActions(m.targetTable.output, m.notMatchedActions), notMatchedBySourceActions = alignActions(m.targetTable.output, m.notMatchedBySourceActions)) - case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && !m.aligned => + case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && !m.aligned + && !m.needSchemaEvolution => resolveAssignments(m) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 5ac853b858cc..9e67aa156fa2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -45,8 +45,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, - notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned && - matchedActions.isEmpty && notMatchedActions.size == 1 && + notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned && + !m.needSchemaEvolution && matchedActions.isEmpty && notMatchedActions.size == 1 && notMatchedBySourceActions.isEmpty => EliminateSubqueryAliases(aliasedTable) match { @@ -79,7 +79,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper } case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, - notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned && + notMatchedBySourceActions, _) + if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution && matchedActions.isEmpty && notMatchedBySourceActions.isEmpty => EliminateSubqueryAliases(aliasedTable) match { @@ -120,7 +121,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper } case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, - notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned => + notMatchedBySourceActions, _) + if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution => EliminateSubqueryAliases(aliasedTable) match { case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 0e540d38ee99..f2f7a0490f91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -38,8 +38,10 @@ import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowLevelOperationTable, SupportsDelta, Write} import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, MERGE, UPDATE} import org.apache.spark.sql.errors.DataTypeErrors.toSQLType +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructType} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -894,6 +896,17 @@ case class MergeIntoTable( override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): MergeIntoTable = copy(targetTable = newLeft, sourceTable = newRight) + + def needSchemaEvolution: Boolean = + schemaEvolutionEnabled && + MergeIntoTable.schemaChanges(targetTable.schema, sourceTable.schema).nonEmpty + + private def schemaEvolutionEnabled: Boolean = withSchemaEvolution && { + EliminateSubqueryAliases(targetTable) match { + case r: DataSourceV2Relation if r.autoSchemaEvolution() => true + case _ => false + } + } } object MergeIntoTable { @@ -909,6 +922,69 @@ object MergeIntoTable { } privileges.toSeq } + + def schemaChanges( + originalTarget: StructType, + originalSource: StructType, + fieldPath: Array[String] = Array()): Array[TableChange] = { + schemaChanges(originalTarget, originalSource, originalTarget, originalSource, fieldPath) + } + + private def schemaChanges( + current: DataType, + newType: DataType, + originalTarget: StructType, + originalSource: StructType, + fieldPath: Array[String]): Array[TableChange] = { + (current, newType) match { + case (StructType(currentFields), StructType(newFields)) => + val newFieldMap = toFieldMap(newFields) + + // Update existing field types + val updates = { + currentFields collect { + case currentField: StructField if newFieldMap.contains(currentField.name) => + schemaChanges(currentField.dataType, newFieldMap(currentField.name).dataType, + originalTarget, originalSource, fieldPath ++ Seq(currentField.name)) + }}.flatten + + // Identify the newly added fields and append to the end + val currentFieldMap = toFieldMap(currentFields) + val adds = newFields.filterNot (f => currentFieldMap.contains (f.name)) + .map(f => TableChange.addColumn(fieldPath ++ Set(f.name), f.dataType)) + + updates ++ adds + + case (ArrayType(currentElementType, _), ArrayType(newElementType, _)) => + schemaChanges(currentElementType, newElementType, + originalTarget, originalSource, fieldPath ++ Seq("element")) + + case (MapType(currentKeyType, currentElementType, _), + MapType(updateKeyType, updateElementType, _)) => + schemaChanges(currentKeyType, updateKeyType, originalTarget, originalSource, + fieldPath ++ Seq("key")) ++ + schemaChanges(currentElementType, updateElementType, + originalTarget, originalSource, fieldPath ++ Seq("value")) + + case (currentType, newType) if currentType == newType => + // No change needed + Array.empty[TableChange] + + case _ => + // For now do not support type widening + throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError( + originalTarget, originalSource, null) + } + } + + def toFieldMap(fields: Array[StructField]): Map[String, StructField] = { + val fieldMap = fields.map(field => field.name -> field).toMap + if (SQLConf.get.caseSensitiveAnalysis) { + fieldMap + } else { + CaseInsensitiveMap(fieldMap) + } + } } sealed abstract class MergeAction extends Expression with Unevaluable { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index b296036c8fc0..be3f9b19591b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3348,6 +3348,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "change" -> change.toString, "tableName" -> toSQLId(sanitizedTableName))) } + def unsupportedTableChangesInAutoSchemaEvolutionError( + changes: Array[TableChange], tableName: Seq[String]): Throwable = { + val sanitizedTableName = tableName.map(_.replaceAll("\"", "")) + new AnalysisException( + errorClass = "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION", + messageParameters = Map( + "changes" -> changes.mkString(","), "tableName" -> toSQLId(sanitizedTableName))) + } + def pathOptionNotSetCorrectlyWhenReadingError(): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1306", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index babdd70d58ba..2b1b40e0a5eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -126,6 +126,9 @@ case class DataSourceV2Relation( this } } + + def autoSchemaEvolution(): Boolean = + table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index ab1d3719e404..83fbedda8619 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -134,6 +134,8 @@ abstract class InMemoryBaseTable( properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean private val acceptAnySchema = properties.getOrDefault("accept-any-schema", "false").toBoolean + private val autoSchemaEvolution = properties.getOrDefault("auto-schema-evolution", "true") + .toBoolean partitioning.foreach { case _: IdentityTransform => @@ -349,13 +351,11 @@ abstract class InMemoryBaseTable( TableCapability.OVERWRITE_DYNAMIC, TableCapability.TRUNCATE) - override def capabilities(): util.Set[TableCapability] = { - if (acceptAnySchema) { - (baseCapabiilities ++ Set(TableCapability.ACCEPT_ANY_SCHEMA)).asJava - } else { - baseCapabiilities.asJava - } - } + override def capabilities(): util.Set[TableCapability] = + (baseCapabiilities ++ + (if (acceptAnySchema) Seq(TableCapability.ACCEPT_ANY_SCHEMA) else Seq.empty) ++ + (if (autoSchemaEvolution) Seq(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION) else Seq.empty)) + .asJava override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new InMemoryScanBuilder(schema, options) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index e5786619f98f..10586adab1f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -21,14 +21,14 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, In, Not} import org.apache.spark.sql.catalyst.optimizer.BuildLeft -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, InMemoryTable, TableInfo} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, ColumnDefaultValue, InMemoryTable, TableInfo} import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression, LiteralValue} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, CartesianProductExec} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.sql.types.{ArrayType, BooleanType, IntegerType, MapType, StringType, StructField, StructType} abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase with AdaptiveSparkPlanHelper { @@ -2154,6 +2154,639 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } } + test("Merge schema evolution new column with set explicit column") { + Seq((true, true), (false, true), (true, false)).foreach { + case (withSchemaEvolution, schemaEvolutionEnabled) => + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) + + if (!schemaEvolutionEnabled) { + sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES + | ('auto-schema-evolution' = 'false')""".stripMargin) + } + + val sourceDF = Seq((4, 150, "dummy", true), + (5, 250, "dummy", true), + (6, 350, "dummy", false)).toDF("pk", "salary", "dep", "active") + sourceDF.createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET dep='software', active=s.active + |WHEN NOT MATCHED THEN + | INSERT (pk, salary, dep, active) VALUES (s.pk, 0, s.dep, s.active) + |""".stripMargin + + if (withSchemaEvolution && schemaEvolutionEnabled) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr", null), + Row(2, 200, "software", null), + Row(3, 300, "hr", null), + Row(4, 400, "software", true), + Row(5, 500, "software", true), + Row(6, 0, "dummy", false))) + } else { + val e = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION") + assert(e.getMessage.contains("A column, variable, or function parameter with name " + + "`active` cannot be resolved")) + } + + sql(s"DROP TABLE $tableNameAsString") + } + } + } + + test("Merge schema evolution new column with set all columns") { + Seq((true, true), (false, true), (true, false)).foreach { + case (withSchemaEvolution, schemaEvolutionEnabled) => + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) + + + if (!schemaEvolutionEnabled) { + sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES + | ('auto-schema-evolution' = 'false')""".stripMargin) + } + + val sourceDF = Seq((4, 150, "finance", true), + (5, 250, "finance", false), + (6, 350, "finance", true)).toDF("pk", "salary", "dep", "active") + sourceDF.createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + sql( + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + + if (withSchemaEvolution && schemaEvolutionEnabled) { + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr", null), + Row(2, 200, "software", null), + Row(3, 300, "hr", null), + Row(4, 150, "finance", true), + Row(5, 250, "finance", false), + Row(6, 350, "finance", true))) + } else { + // Without schema evolution, the new columns are not added + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr"), + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 150, "finance"), + Row(5, 250, "finance"), + Row(6, 350, "finance"))) + } + } + sql(s"DROP TABLE $tableNameAsString") + } + } + + test("Merge schema evolution replacing column with set all column") { + Seq((true, true), (false, true), (true, false)).foreach { + case (withSchemaEvolution, schemaEvolutionEnabled) => + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) + + if (!schemaEvolutionEnabled) { + sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES + | ('auto-schema-evolution' = 'false')""".stripMargin) + } + + val sourceDF = Seq((4, 150, true), + (5, 250, true), + (6, 350, false)).toDF("pk", "salary", "active") + sourceDF.createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + sql(s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + if (withSchemaEvolution && schemaEvolutionEnabled) { + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr", null), + Row(2, 200, "software", null), + Row(3, 300, "hr", null), + Row(4, 150, "marketing", true), + Row(5, 250, "executive", true), + Row(6, 350, null, false))) + } else { + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr"), + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 150, "marketing"), + Row(5, 250, "executive"), + Row(6, 350, null))) + } + sql(s"DROP TABLE $tableNameAsString") + } + } + } + + test("Merge schema evolution replacing column with set explicit column") { + Seq((true, true), (false, true), (true, false)).foreach { + case (withSchemaEvolution, schemaEvolutionEnabled) => + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) + + if (!schemaEvolutionEnabled) { + sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES + | ('auto-schema-evolution' = 'false')""".stripMargin) + } + + val sourceDF = Seq((4, 150, true), + (5, 250, true), + (6, 350, false)).toDF("pk", "salary", "active") + sourceDF.createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET dep = 'finance', active = s.active + |WHEN NOT MATCHED THEN + | INSERT (pk, salary, dep, active) VALUES + | (s.pk, s.salary, 'finance', s.active) + |""".stripMargin + + if (withSchemaEvolution && schemaEvolutionEnabled) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr", null), + Row(2, 200, "software", null), + Row(3, 300, "hr", null), + Row(4, 400, "finance", true), + Row(5, 500, "finance", true), + Row(6, 350, "finance", false))) + } else { + val e = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION") + assert(e.getMessage.contains("A column, variable, or function parameter with name " + + "`active` cannot be resolved")) + } + + sql(s"DROP TABLE $tableNameAsString") + } + } + } + + test("merge into schema evolution add column with nested field and set explicit columns") { + Seq(true, false).foreach { withSchemaEvolution => + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, STRING>>>, + |dep STRING""".stripMargin, + """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } } }, "dep": "hr" }""") + + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", ArrayType(IntegerType)), + StructField("m", MapType(StringType, StringType)), + StructField("c3", BooleanType) // new column + ))) + ))), + StructField("dep", StringType) + )) + val data = Seq( + Row(1, Row(10, Row(Array(3, 4), Map("c" -> "d"), false)), "sales"), + Row(2, Row(20, Row(Array(4, 5), Map("e" -> "f"), true)), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source src + |ON t.pk = src.pk + |WHEN MATCHED THEN + | UPDATE SET s.c1 = -1, s.c2.m = map('k', 'v'), s.c2.a = array(-1), + | s.c2.c3 = src.s.c2.c3 + |WHEN NOT MATCHED THEN + | INSERT (pk, s, dep) VALUES (src.pk, + | named_struct('c1', src.s.c1, + | 'c2', named_struct('a', src.s.c2.a, 'm', map('g', 'h'), 'c3', true)), src.dep) + |""".stripMargin + + if (withSchemaEvolution) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)), "hr"), + Row(2, Row(20, Row(Seq(4, 5), Map("g" -> "h"), true)), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == "FIELD_NOT_FOUND") + assert(exception.getMessage.contains("No such struct field `c3` in `a`, `m`. ")) + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + + test("merge into schema evolution add column with nested field and set all columns") { + Seq(true, false).foreach { withSchemaEvolution => + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, STRING>>>, + |dep STRING""".stripMargin, + """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } } }, "dep": "hr" }""") + + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", ArrayType(IntegerType)), + StructField("m", MapType(StringType, StringType)), + StructField("c3", BooleanType) // new column + ))) + ))), + StructField("dep", StringType) + )) + val data = Seq( + Row(1, Row(10, Row(Array(3, 4), Map("c" -> "d"), false)), "sales"), + Row(2, Row(20, Row(Array(4, 5), Map("e" -> "f"), true)), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source src + |ON t.pk = src.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (withSchemaEvolution) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq(Row(1, Row(10, Row(Seq(3, 4), Map("c" -> "d"), false)), "sales"), + Row(2, Row(20, Row(Seq(4, 5), Map("e" -> "f"), true)), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + assert(exception.getMessage.contains( + "Cannot write extra fields `c3` to the struct `s`.`c2`")) + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + + test("merge into schema evolution replace column with nested field and set explicit columns") { + Seq(true, false).foreach { withSchemaEvolution => + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, STRING>>>, + |dep STRING""".stripMargin, + """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } } }, "dep": "hr" }""") + + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + // removed column 'a' + StructField("m", MapType(StringType, StringType)), + StructField("c3", BooleanType) // new column + ))) + ))), + StructField("dep", StringType) + )) + val data = Seq( + Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"), + Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source src + |ON t.pk = src.pk + |WHEN MATCHED THEN + | UPDATE SET s.c1 = -1, s.c2.m = map('k', 'v'), s.c2.a = array(-1), + | s.c2.c3 = src.s.c2.c3 + |WHEN NOT MATCHED THEN + | INSERT (pk, s, dep) VALUES (src.pk, + | named_struct('c1', src.s.c1, + | 'c2', named_struct('a', array(-2), 'm', map('g', 'h'), 'c3', true)), src.dep) + |""".stripMargin + + if (withSchemaEvolution) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)), "hr"), + Row(2, Row(20, Row(Seq(-2), Map("g" -> "h"), true)), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == "FIELD_NOT_FOUND") + assert(exception.getMessage.contains("No such struct field `c3` in `a`, `m`. ")) + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + + // TODO- support schema evolution for missing nested types using UPDATE SET * and INSERT * + test("merge into schema evolution replace column with nested field and set all columns") { + Seq(true, false).foreach { withSchemaEvolution => + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, STRING>>>, + |dep STRING""".stripMargin, + """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } } }, "dep": "hr" }""") + + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + // removed column 'a' + StructField("m", MapType(StringType, StringType)), + StructField("c3", BooleanType) // new column + ))) + ))), + StructField("dep", StringType) + )) + val data = Seq( + Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"), + Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql( + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source src + |ON t.pk = src.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + } + + assert(exception.errorClass.get == "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + assert(exception.getMessage.contains("Cannot find data for the output column `s`.`c2`.`a`")) + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + + test("merge into schema evolution add column for struct in array and set all columns") { + Seq(true, false).foreach { withSchemaEvolution => + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |a ARRAY<STRUCT<c1: INT, c2: STRING>>, + |dep STRING""".stripMargin, + """{ "pk": 0, "a": [ { "c1": 1, "c2": "a" }, { "c1": 2, "c2": "b" } ], "dep": "sales"}, + { "pk": 1, "a": [ { "c1": 1, "c2": "a" }, { "c1": 2, "c2": "b" } ], "dep": "hr" }""" + .stripMargin) + + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("a", ArrayType( + StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType))))), // new column + StructField("dep", StringType))) + val data = Seq( + Row(1, Array(Row(10, "c", true), Row(20, "d", false)), "hr"), + Row(2, Array(Row(30, "d", false), Row(40, "e", true)), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source src + |ON t.pk = src.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (withSchemaEvolution) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + // TODO- InMemoryBaseTable does not return null for nested schema evolution. + Seq(Row(0, Array(Row(1, "a", true), Row(2, "b", true)), "sales"), + Row(1, Array(Row(10, "c", true), Row(20, "d", false)), "hr"), + Row(2, Array(Row(30, "d", false), Row(40, "e", true)), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + assert(exception.getMessage.contains( + "Cannot write extra fields `c3` to the struct `a`.`element`")) + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + + test("merge into schema evolution add column for struct in map and set all columns") { + Seq(true, false).foreach { withSchemaEvolution => + withTempView("source") { + val schema = + StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("m", MapType( + StructType(Seq(StructField("c1", IntegerType))), + StructType(Seq(StructField("c2", StringType))))), + StructField("dep", StringType))) + createTable(CatalogV2Util.structTypeToV2Columns(schema)) + + val data = Seq( + Row(0, Map(Row(10) -> Row("c")), "hr"), + Row(1, Map(Row(20) -> Row("d")), "sales")) + spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + .writeTo(tableNameAsString).append() + + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("m", MapType( + StructType(Seq(StructField("c1", IntegerType), StructField("c3", BooleanType))), + StructType(Seq(StructField("c2", StringType), StructField("c4", BooleanType))))), + StructField("dep", StringType))) + val sourceData = Seq( + Row(1, Map(Row(10, true) -> Row("y", false)), "sales"), + Row(2, Map(Row(20, false) -> Row("z", true)), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(sourceData), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source src + |ON t.pk = src.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (withSchemaEvolution) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + // TODO- InMemoryBaseTable does not return null for nested schema evolution. + Seq(Row(0, Map(Row(10, true) -> Row("c", true)), "hr"), + Row(1, Map(Row(10, true) -> Row("y", false)), "sales"), + Row(2, Map(Row(20, false) -> Row("z", true)), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + assert(exception.getMessage.contains( + "Cannot write extra fields `c3` to the struct `m`.`key`")) + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + + test("merge into empty table with NOT MATCHED clause schema evolution") { + Seq(true, false) foreach { withSchemaEvolution => + withTempView("source") { + createTable("pk INT NOT NULL, salary INT, dep STRING") + + val sourceRows = Seq( + (1, 100, "hr", true), + (2, 200, "finance", false), + (3, 300, "hr", true)) + sourceRows.toDF("pk", "salary", "dep", "active").createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + + sql( + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + + if (withSchemaEvolution) { + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr", true), + Row(2, 200, "finance", false), + Row(3, 300, "hr", true))) + } else { + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr"), + Row(2, 200, "finance"), + Row(3, 300, "hr"))) + } + sql("DROP TABLE IF EXISTS " + tableNameAsString) + } + } + } + private def findMergeExec(query: String): MergeRowsExec = { val plan = executeAndKeepPlan { sql(query) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index ccf502d79c00..ecc293a5acc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -2315,11 +2315,24 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { |USING testcat.tab2 |ON 1 = 1 |WHEN MATCHED THEN UPDATE SET *""".stripMargin - checkError( - exception = intercept[AnalysisException](parseAndResolve(sql2)), - condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", - parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"), - context = ExpectedContext(fragment = sql2, start = 0, stop = 80)) + val parsed2 = parseAndResolve(sql2) + parsed2 match { + case MergeIntoTable( + AsDataSourceV2Relation(target), + AsDataSourceV2Relation(source), + EqualTo(IntegerLiteral(1), IntegerLiteral(1)), + Seq(UpdateAction(None, updateAssigns)), // Matched actions + Seq(), // Not matched actions + Seq(), // Not matched by source actions + withSchemaEvolution) => + val ti = target.output.find(_.name == "i").get + val si = source.output.find(_.name == "i").get + assert(updateAssigns.size == 1) + assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti)) + assert(updateAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si)) + assert(withSchemaEvolution === false) + case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) + } // INSERT * with incompatible schema between source and target tables. val sql3 = @@ -2327,11 +2340,24 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { |USING testcat.tab2 |ON 1 = 1 |WHEN NOT MATCHED THEN INSERT *""".stripMargin - checkError( - exception = intercept[AnalysisException](parseAndResolve(sql3)), - condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", - parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"), - context = ExpectedContext(fragment = sql3, start = 0, stop = 80)) + val parsed3 = parseAndResolve(sql3) + parsed3 match { + case MergeIntoTable( + AsDataSourceV2Relation(target), + AsDataSourceV2Relation(source), + EqualTo(IntegerLiteral(1), IntegerLiteral(1)), + Seq(), // Matched action + Seq(InsertAction(None, insertAssigns)), // Not matched actions + Seq(), // Not matched by source actions + withSchemaEvolution) => + val ti = target.output.find(_.name == "i").get + val si = source.output.find(_.name == "i").get + assert(insertAssigns.size == 1) + assert(insertAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti)) + assert(insertAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si)) + assert(withSchemaEvolution === false) + case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) + } val sql4 = """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org