This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ce646b3dac93 [SPARK-52991][SQL] Implement MERGE INTO with SCHEMA 
EVOLUTION for V2 Data Source
ce646b3dac93 is described below

commit ce646b3dac93b5f87ccfcc48a032228ef4dcdec7
Author: Szehon Ho <szehon.apa...@gmail.com>
AuthorDate: Sat Aug 23 01:15:06 2025 +0800

    [SPARK-52991][SQL] Implement MERGE INTO with SCHEMA EVOLUTION for V2 Data 
Source
    
    ### What changes were proposed in this pull request?
    Add support for schema evolution for data source that support MERGE INTO, 
currently V2 DataSources.  This means that if the SOURCE table of merge has a 
different schema than TARGET table, the TARGET table can automatically update 
to take into account the new or different fields.
    
    The basic idea is to add
    
    - TableCapability.MERGE_SCHEMA_EVOLUTION to indicate DSV2 table wants Spark 
to handle schema evolution for MERGE
    - ResolveMergeIntoSchemaEvolution rule, will generate DSV2 TableChanges and 
calls Catalog.alterTable
    
    For any new field in the top level or in a nested struct, Spark will add 
the field to the end.
    
    TODOS:
    
    1. this currently does not support the case where SOURCE has a missing 
nested field from TARGET, and there is a UPDATE or INSERT star.
    
    Example:
    ```
    MERGE INTO TARGET t USING SOURCE s
    // s=struct('a', struct('b': Int))
    // t = struct('a', struct('c', int))
    ```
    will only work if the user specifies a value explicitly for the new nested 
field t.b for INSERT and UPDATE, ie
    ```
    INSERT (s) VALUES (nested_struct('a', nested_struct('b', 1, 'c' 2)))
    UPDATE SET a.b = 2
    ```
     and not if they use INSERT * or UPDATE SET *.
    
    2. Type widening is not allowed for the moment, as we need to decide what 
widenings to allow
    
    We can take this in a follow on pr.
    
    ### Why are the changes needed?
    https://github.com/apache/spark/pull/45748 added the syntax 'WITH SCHEMA 
EVOLUTION' to 'MERGE INTO'.  However, this requires some external Spark 
extension to resolve Merge, and doesnt do anything in Spark's native MERGE 
implementation.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added many tests to MergeIntoTableSuiteBase
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51698 from szehon-ho/merge_schema_evolution.
    
    Authored-by: Szehon Ho <szehon.apa...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../src/main/resources/error/error-conditions.json |   6 +
 .../sql/connector/catalog/TableCapability.java     |   6 +
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  21 +-
 .../analysis/ResolveMergeIntoSchemaEvolution.scala |  65 +++
 .../ResolveRowLevelCommandAssignments.scala        |   6 +-
 .../catalyst/analysis/RewriteMergeIntoTable.scala  |  10 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    |  78 ++-
 .../spark/sql/errors/QueryCompilationErrors.scala  |   9 +
 .../datasources/v2/DataSourceV2Relation.scala      |   3 +
 .../sql/connector/catalog/InMemoryBaseTable.scala  |  14 +-
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 637 ++++++++++++++++++++-
 .../execution/command/PlanResolutionSuite.scala    |  46 +-
 12 files changed, 868 insertions(+), 33 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index a7105e228e2b..07725ea7e0ee 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -6731,6 +6731,12 @@
     },
     "sqlState" : "0A000"
   },
+  "UNSUPPORTED_TABLE_CHANGE_IN_AUTO_SCHEMA_EVOLUTION" : {
+    "message" : [
+      "The table changes <changes> are not supported by the catalog on table 
<tableName>."
+    ],
+    "sqlState" : "42000"
+  },
   "UNSUPPORTED_TABLE_CHANGE_IN_JDBC_CATALOG" : {
     "message" : [
       "The table change <change> is not supported for the JDBC catalog on 
table <tableName>. Supported changes include: AddColumn, RenameColumn, 
DeleteColumn, UpdateColumnType, UpdateColumnNullability."
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java
index 5732c0f3af4e..0a01c0c266b9 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCapability.java
@@ -93,6 +93,12 @@ public enum TableCapability {
    */
   ACCEPT_ANY_SCHEMA,
 
+  /**
+   * Signals that table supports Spark altering the schema if necessary
+   * as part of an operation.
+   */
+  AUTOMATIC_SCHEMA_EVOLUTION,
+
   /**
    * Signals that the table supports append writes using the V1 
InsertableRelation interface.
    * <p>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ccac4d8c4c0d..b25e4d5d538f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -444,6 +444,7 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
       AddMetadataColumns ::
       DeduplicateRelations ::
       ResolveCollationName ::
+      ResolveMergeIntoSchemaEvolution ::
       new ResolveReferences(catalogManager) ::
       // Please do not insert any other rules in between. See the TODO 
comments in rule
       // ResolveLateralColumnAliasReference for more details.
@@ -1669,7 +1670,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       case u: UpdateTable => resolveReferencesInUpdate(u)
 
       case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _, _)
-        if !m.resolved && targetTable.resolved && sourceTable.resolved =>
+        if !m.resolved && targetTable.resolved && sourceTable.resolved && 
!m.needSchemaEvolution =>
 
         EliminateSubqueryAliases(targetTable) match {
           case r: NamedRelation if r.skipSchemaResolution =>
@@ -1692,9 +1693,12 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
                   // The update value can access columns from both target and 
source tables.
                   resolveAssignments(assignments, m, MergeResolvePolicy.BOTH))
               case UpdateStarAction(updateCondition) =>
-                val assignments = targetTable.output.map { attr =>
-                  Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
-                }
+                // Use only source columns.  Missing columns in target will be 
handled in
+                // ResolveRowLevelCommandAssignments.
+                val assignments = targetTable.output.flatMap{ targetAttr =>
+                  sourceTable.output.find(
+                      sourceCol => conf.resolver(sourceCol.name, 
targetAttr.name))
+                    .map(Assignment(targetAttr, _))}
                 UpdateAction(
                   updateCondition.map(resolveExpressionByPlanChildren(_, m)),
                   // For UPDATE *, the value must be from source table.
@@ -1715,9 +1719,12 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
                 // access columns from the source table.
                 val resolvedInsertCondition = insertCondition.map(
                   resolveExpressionByPlanOutput(_, m.sourceTable))
-                val assignments = targetTable.output.map { attr =>
-                  Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
-                }
+                // Use only source columns.  Missing columns in target will be 
handled in
+                // ResolveRowLevelCommandAssignments.
+                val assignments = targetTable.output.flatMap{ targetAttr =>
+                  sourceTable.output.find(
+                      sourceCol => conf.resolver(sourceCol.name, 
targetAttr.name))
+                    .map(Assignment(targetAttr, _))}
                 InsertAction(
                   resolvedInsertCondition,
                   resolveAssignments(assignments, m, 
MergeResolvePolicy.SOURCE))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
new file mode 100644
index 000000000000..7e7776098a04
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+
+/**
+ * A rule that resolves schema evolution for MERGE INTO.
+ *
+ * This rule will call the DSV2 Catalog to update the schema of the target 
table.
+ */
+object ResolveMergeIntoSchemaEvolution extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case m @ MergeIntoTable(_, _, _, _, _, _, _)
+      if m.needSchemaEvolution =>
+        val newTarget = m.targetTable.transform {
+          case r : DataSourceV2Relation => performSchemaEvolution(r, 
m.sourceTable)
+        }
+        m.copy(targetTable = newTarget)
+  }
+
+  private def performSchemaEvolution(relation: DataSourceV2Relation, source: 
LogicalPlan)
+    : DataSourceV2Relation = {
+    (relation.catalog, relation.identifier) match {
+      case (Some(c: TableCatalog), Some(i)) =>
+        val changes = MergeIntoTable.schemaChanges(relation.schema, 
source.schema)
+        c.alterTable(i, changes: _*)
+        val newTable = c.loadTable(i)
+        val newSchema = CatalogV2Util.v2ColumnsToStructType(newTable.columns())
+        // Check if there are any remaining changes not applied.
+        val remainingChanges = MergeIntoTable.schemaChanges(newSchema, 
source.schema)
+        if (remainingChanges.nonEmpty) {
+          throw 
QueryCompilationErrors.unsupportedTableChangesInAutoSchemaEvolutionError(
+            remainingChanges, i.toQualifiedNameParts(c))
+        }
+        relation.copy(table = newTable, output = 
DataTypeUtils.toAttributes(newSchema))
+      case _ => logWarning(s"Schema Evolution enabled but data source 
$relation " +
+        s"does not support it, skipping.")
+        relation
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
index 3f3e707b054b..83520b780f12 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
@@ -48,7 +48,8 @@ object ResolveRowLevelCommandAssignments extends 
Rule[LogicalPlan] {
     case u: UpdateTable if !u.skipSchemaResolution && u.resolved && !u.aligned 
=>
       resolveAssignments(u)
 
-    case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && 
m.rewritable && !m.aligned =>
+    case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && 
m.rewritable && !m.aligned &&
+      !m.needSchemaEvolution =>
       validateStoreAssignmentPolicy()
       m.copy(
         targetTable = cleanAttrMetadata(m.targetTable),
@@ -56,7 +57,8 @@ object ResolveRowLevelCommandAssignments extends 
Rule[LogicalPlan] {
         notMatchedActions = alignActions(m.targetTable.output, 
m.notMatchedActions),
         notMatchedBySourceActions = alignActions(m.targetTable.output, 
m.notMatchedBySourceActions))
 
-    case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && 
!m.aligned =>
+    case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && 
!m.aligned
+      && !m.needSchemaEvolution =>
       resolveAssignments(m)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 5ac853b858cc..9e67aa156fa2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -45,8 +45,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned &&
-        matchedActions.isEmpty && notMatchedActions.size == 1 &&
+      notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned 
&&
+        !m.needSchemaEvolution && matchedActions.isEmpty && 
notMatchedActions.size == 1 &&
         notMatchedBySourceActions.isEmpty =>
 
       EliminateSubqueryAliases(aliasedTable) match {
@@ -79,7 +79,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
       }
 
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned &&
+        notMatchedBySourceActions, _)
+      if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution &&
         matchedActions.isEmpty && notMatchedBySourceActions.isEmpty =>
 
       EliminateSubqueryAliases(aliasedTable) match {
@@ -120,7 +121,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
       }
 
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned =>
+        notMatchedBySourceActions, _)
+      if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution =>
 
       EliminateSubqueryAliases(aliasedTable) match {
         case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, 
_, _) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 0e540d38ee99..f2f7a0490f91 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -38,8 +38,10 @@ import 
org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, 
RowLevelOperationTable, SupportsDelta, Write}
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, 
MERGE, UPDATE}
 import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, 
MapType, MetadataBuilder, StringType, StructType}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, 
IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
 
@@ -894,6 +896,17 @@ case class MergeIntoTable(
   override protected def withNewChildrenInternal(
       newLeft: LogicalPlan, newRight: LogicalPlan): MergeIntoTable =
     copy(targetTable = newLeft, sourceTable = newRight)
+
+  def needSchemaEvolution: Boolean =
+    schemaEvolutionEnabled &&
+      MergeIntoTable.schemaChanges(targetTable.schema, 
sourceTable.schema).nonEmpty
+
+  private def schemaEvolutionEnabled: Boolean = withSchemaEvolution && {
+    EliminateSubqueryAliases(targetTable) match {
+      case r: DataSourceV2Relation if r.autoSchemaEvolution() => true
+      case _ => false
+    }
+  }
 }
 
 object MergeIntoTable {
@@ -909,6 +922,69 @@ object MergeIntoTable {
     }
     privileges.toSeq
   }
+
+  def schemaChanges(
+      originalTarget: StructType,
+      originalSource: StructType,
+      fieldPath: Array[String] = Array()): Array[TableChange] = {
+    schemaChanges(originalTarget, originalSource, originalTarget, 
originalSource, fieldPath)
+  }
+
+  private def schemaChanges(
+      current: DataType,
+      newType: DataType,
+      originalTarget: StructType,
+      originalSource: StructType,
+      fieldPath: Array[String]): Array[TableChange] = {
+    (current, newType) match {
+      case (StructType(currentFields), StructType(newFields)) =>
+        val newFieldMap = toFieldMap(newFields)
+
+        // Update existing field types
+        val updates = {
+          currentFields collect {
+            case currentField: StructField if 
newFieldMap.contains(currentField.name) =>
+              schemaChanges(currentField.dataType, 
newFieldMap(currentField.name).dataType,
+                originalTarget, originalSource, fieldPath ++ 
Seq(currentField.name))
+          }}.flatten
+
+        // Identify the newly added fields and append to the end
+        val currentFieldMap = toFieldMap(currentFields)
+        val adds = newFields.filterNot (f => currentFieldMap.contains (f.name))
+          .map(f => TableChange.addColumn(fieldPath ++ Set(f.name), 
f.dataType))
+
+        updates ++ adds
+
+      case (ArrayType(currentElementType, _), ArrayType(newElementType, _)) =>
+        schemaChanges(currentElementType, newElementType,
+          originalTarget, originalSource, fieldPath ++ Seq("element"))
+
+      case (MapType(currentKeyType, currentElementType, _),
+      MapType(updateKeyType, updateElementType, _)) =>
+        schemaChanges(currentKeyType, updateKeyType, originalTarget, 
originalSource,
+          fieldPath ++ Seq("key")) ++
+          schemaChanges(currentElementType, updateElementType,
+            originalTarget, originalSource, fieldPath ++ Seq("value"))
+
+      case (currentType, newType) if currentType == newType =>
+        // No change needed
+        Array.empty[TableChange]
+
+      case _ =>
+        // For now do not support type widening
+        throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(
+          originalTarget, originalSource, null)
+    }
+  }
+
+  def toFieldMap(fields: Array[StructField]): Map[String, StructField] = {
+    val fieldMap = fields.map(field => field.name -> field).toMap
+    if (SQLConf.get.caseSensitiveAnalysis) {
+      fieldMap
+    } else {
+      CaseInsensitiveMap(fieldMap)
+    }
+  }
 }
 
 sealed abstract class MergeAction extends Expression with Unevaluable {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index b296036c8fc0..be3f9b19591b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3348,6 +3348,15 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "change" -> change.toString, "tableName" -> 
toSQLId(sanitizedTableName)))
   }
 
+  def unsupportedTableChangesInAutoSchemaEvolutionError(
+      changes: Array[TableChange], tableName: Seq[String]): Throwable = {
+    val sanitizedTableName = tableName.map(_.replaceAll("\"", ""))
+    new AnalysisException(
+      errorClass = "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
+      messageParameters = Map(
+        "changes" -> changes.mkString(","), "tableName" -> 
toSQLId(sanitizedTableName)))
+  }
+
   def pathOptionNotSetCorrectlyWhenReadingError(): Throwable = {
     new AnalysisException(
       errorClass = "_LEGACY_ERROR_TEMP_1306",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index babdd70d58ba..2b1b40e0a5eb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -126,6 +126,9 @@ case class DataSourceV2Relation(
       this
     }
   }
+
+  def autoSchemaEvolution(): Boolean =
+    table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
 }
 
 /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index ab1d3719e404..83fbedda8619 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -134,6 +134,8 @@ abstract class InMemoryBaseTable(
     properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
 
   private val acceptAnySchema = properties.getOrDefault("accept-any-schema", 
"false").toBoolean
+  private val autoSchemaEvolution = 
properties.getOrDefault("auto-schema-evolution", "true")
+    .toBoolean
 
   partitioning.foreach {
     case _: IdentityTransform =>
@@ -349,13 +351,11 @@ abstract class InMemoryBaseTable(
     TableCapability.OVERWRITE_DYNAMIC,
     TableCapability.TRUNCATE)
 
-  override def capabilities(): util.Set[TableCapability] = {
-    if (acceptAnySchema) {
-      (baseCapabiilities ++ Set(TableCapability.ACCEPT_ANY_SCHEMA)).asJava
-    } else {
-      baseCapabiilities.asJava
-    }
-  }
+  override def capabilities(): util.Set[TableCapability] =
+    (baseCapabiilities ++
+      (if (acceptAnySchema) Seq(TableCapability.ACCEPT_ANY_SCHEMA) else 
Seq.empty) ++
+      (if (autoSchemaEvolution) 
Seq(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION) else Seq.empty))
+      .asJava
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     new InMemoryScanBuilder(schema, options)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index e5786619f98f..10586adab1f6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -21,14 +21,14 @@ import org.apache.spark.SparkRuntimeException
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
In, Not}
 import org.apache.spark.sql.catalyst.optimizer.BuildLeft
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
InMemoryTable, TableInfo}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
ColumnDefaultValue, InMemoryTable, TableInfo}
 import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression, 
LiteralValue}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, CartesianProductExec}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.sql.types.{ArrayType, BooleanType, IntegerType, 
MapType, StringType, StructField, StructType}
 
 abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
   with AdaptiveSparkPlanHelper {
@@ -2154,6 +2154,639 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
     }
   }
 
+  test("Merge schema evolution new column with set explicit column") {
+    Seq((true, true), (false, true), (true, false)).foreach {
+      case (withSchemaEvolution, schemaEvolutionEnabled) =>
+        withTempView("source") {
+          createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+            """{ "pk": 1, "salary": 100, "dep": "hr" }
+              |{ "pk": 2, "salary": 200, "dep": "software" }
+              |{ "pk": 3, "salary": 300, "dep": "hr" }
+              |{ "pk": 4, "salary": 400, "dep": "marketing" }
+              |{ "pk": 5, "salary": 500, "dep": "executive" }
+              |""".stripMargin)
+
+          if (!schemaEvolutionEnabled) {
+            sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES
+                   | ('auto-schema-evolution' = 'false')""".stripMargin)
+          }
+
+          val sourceDF = Seq((4, 150, "dummy", true),
+            (5, 250, "dummy", true),
+            (6, 350, "dummy", false)).toDF("pk", "salary", "dep", "active")
+          sourceDF.createOrReplaceTempView("source")
+
+          val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+          val mergeStmt = s"""MERGE $schemaEvolutionClause
+                             |INTO $tableNameAsString t
+                             |USING source s
+                             |ON t.pk = s.pk
+                             |WHEN MATCHED THEN
+                             | UPDATE SET dep='software', active=s.active
+                             |WHEN NOT MATCHED THEN
+                             | INSERT (pk, salary, dep, active) VALUES (s.pk, 
0, s.dep, s.active)
+                             |""".stripMargin
+
+          if (withSchemaEvolution && schemaEvolutionEnabled) {
+            sql(mergeStmt)
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString"),
+              Seq(
+                Row(1, 100, "hr", null),
+                Row(2, 200, "software", null),
+                Row(3, 300, "hr", null),
+                Row(4, 400, "software", true),
+                Row(5, 500, "software", true),
+                Row(6, 0, "dummy", false)))
+          } else {
+            val e = intercept[org.apache.spark.sql.AnalysisException] {
+              sql(mergeStmt)
+            }
+            assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+            assert(e.getMessage.contains("A column, variable, or function 
parameter with name " +
+              "`active` cannot be resolved"))
+          }
+
+          sql(s"DROP TABLE $tableNameAsString")
+        }
+    }
+  }
+
+  test("Merge schema evolution new column with set all columns") {
+    Seq((true, true), (false, true), (true, false)).foreach {
+      case (withSchemaEvolution, schemaEvolutionEnabled) =>
+      withTempView("source") {
+        createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+          """{ "pk": 1, "salary": 100, "dep": "hr" }
+            |{ "pk": 2, "salary": 200, "dep": "software" }
+            |{ "pk": 3, "salary": 300, "dep": "hr" }
+            |{ "pk": 4, "salary": 400, "dep": "marketing" }
+            |{ "pk": 5, "salary": 500, "dep": "executive" }
+            |""".stripMargin)
+
+
+        if (!schemaEvolutionEnabled) {
+          sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES
+                 | ('auto-schema-evolution' = 'false')""".stripMargin)
+        }
+
+        val sourceDF = Seq((4, 150, "finance", true),
+          (5, 250, "finance", false),
+          (6, 350, "finance", true)).toDF("pk", "salary", "dep", "active")
+        sourceDF.createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        sql(
+          s"""MERGE $schemaEvolutionClause
+             |INTO $tableNameAsString t
+             |USING source s
+             |ON t.pk = s.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin)
+
+        if (withSchemaEvolution && schemaEvolutionEnabled) {
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(
+              Row(1, 100, "hr", null),
+              Row(2, 200, "software", null),
+              Row(3, 300, "hr", null),
+              Row(4, 150, "finance", true),
+              Row(5, 250, "finance", false),
+              Row(6, 350, "finance", true)))
+        } else {
+          // Without schema evolution, the new columns are not added
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(
+              Row(1, 100, "hr"),
+              Row(2, 200, "software"),
+              Row(3, 300, "hr"),
+              Row(4, 150, "finance"),
+              Row(5, 250, "finance"),
+              Row(6, 350, "finance")))
+        }
+      }
+      sql(s"DROP TABLE $tableNameAsString")
+    }
+  }
+
+  test("Merge schema evolution replacing column with set all column") {
+    Seq((true, true), (false, true), (true, false)).foreach {
+      case (withSchemaEvolution, schemaEvolutionEnabled) =>
+      withTempView("source") {
+        createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+          """{ "pk": 1, "salary": 100, "dep": "hr" }
+            |{ "pk": 2, "salary": 200, "dep": "software" }
+            |{ "pk": 3, "salary": 300, "dep": "hr" }
+            |{ "pk": 4, "salary": 400, "dep": "marketing" }
+            |{ "pk": 5, "salary": 500, "dep": "executive" }
+            |""".stripMargin)
+
+        if (!schemaEvolutionEnabled) {
+          sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES
+                 | ('auto-schema-evolution' = 'false')""".stripMargin)
+        }
+
+        val sourceDF = Seq((4, 150, true),
+          (5, 250, true),
+          (6, 350, false)).toDF("pk", "salary", "active")
+        sourceDF.createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        sql(s"""MERGE $schemaEvolutionClause
+                           |INTO $tableNameAsString t
+                           |USING source s
+                           |ON t.pk = s.pk
+                           |WHEN MATCHED THEN
+                           | UPDATE SET *
+                           |WHEN NOT MATCHED THEN
+                           | INSERT *
+                           |""".stripMargin)
+        if (withSchemaEvolution && schemaEvolutionEnabled) {
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(
+              Row(1, 100, "hr", null),
+              Row(2, 200, "software", null),
+              Row(3, 300, "hr", null),
+              Row(4, 150, "marketing", true),
+              Row(5, 250, "executive", true),
+              Row(6, 350, null, false)))
+        } else {
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(
+              Row(1, 100, "hr"),
+              Row(2, 200, "software"),
+              Row(3, 300, "hr"),
+              Row(4, 150, "marketing"),
+              Row(5, 250, "executive"),
+              Row(6, 350, null)))
+        }
+        sql(s"DROP TABLE $tableNameAsString")
+      }
+    }
+  }
+
+  test("Merge schema evolution replacing column with set explicit column") {
+    Seq((true, true), (false, true), (true, false)).foreach {
+      case (withSchemaEvolution, schemaEvolutionEnabled) =>
+      withTempView("source") {
+        createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+          """{ "pk": 1, "salary": 100, "dep": "hr" }
+            |{ "pk": 2, "salary": 200, "dep": "software" }
+            |{ "pk": 3, "salary": 300, "dep": "hr" }
+            |{ "pk": 4, "salary": 400, "dep": "marketing" }
+            |{ "pk": 5, "salary": 500, "dep": "executive" }
+            |""".stripMargin)
+
+        if (!schemaEvolutionEnabled) {
+          sql(s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES
+                 | ('auto-schema-evolution' = 'false')""".stripMargin)
+        }
+
+        val sourceDF = Seq((4, 150, true),
+          (5, 250, true),
+          (6, 350, false)).toDF("pk", "salary", "active")
+        sourceDF.createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        val mergeStmt = s"""MERGE $schemaEvolutionClause
+                           |INTO $tableNameAsString t
+                           |USING source s
+                           |ON t.pk = s.pk
+                           |WHEN MATCHED THEN
+                           | UPDATE SET dep = 'finance', active = s.active
+                           |WHEN NOT MATCHED THEN
+                           | INSERT (pk, salary, dep, active) VALUES
+                           | (s.pk, s.salary, 'finance', s.active)
+                           |""".stripMargin
+
+        if (withSchemaEvolution && schemaEvolutionEnabled) {
+          sql(mergeStmt)
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(
+              Row(1, 100, "hr", null),
+              Row(2, 200, "software", null),
+              Row(3, 300, "hr", null),
+              Row(4, 400, "finance", true),
+              Row(5, 500, "finance", true),
+              Row(6, 350, "finance", false)))
+        } else {
+          val e = intercept[org.apache.spark.sql.AnalysisException] {
+            sql(mergeStmt)
+          }
+          assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+          assert(e.getMessage.contains("A column, variable, or function 
parameter with name " +
+            "`active` cannot be resolved"))
+        }
+
+        sql(s"DROP TABLE $tableNameAsString")
+      }
+    }
+  }
+
+  test("merge into schema evolution add column with nested field and set 
explicit columns") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTempView("source") {
+        createAndInitTable(
+          s"""pk INT NOT NULL,
+             |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, 
STRING>>>,
+             |dep STRING""".stripMargin,
+          """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } 
} }, "dep": "hr" }""")
+
+        val sourceTableSchema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("s", StructType(Seq(
+            StructField("c1", IntegerType),
+            StructField("c2", StructType(Seq(
+              StructField("a", ArrayType(IntegerType)),
+              StructField("m", MapType(StringType, StringType)),
+              StructField("c3", BooleanType) // new column
+            )))
+          ))),
+          StructField("dep", StringType)
+        ))
+        val data = Seq(
+          Row(1, Row(10, Row(Array(3, 4), Map("c" -> "d"), false)), "sales"),
+          Row(2, Row(20, Row(Array(4, 5), Map("e" -> "f"), true)), 
"engineering")
+        )
+        spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+          .createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        val mergeStmt =
+          s"""MERGE $schemaEvolutionClause
+             |INTO $tableNameAsString t
+             |USING source src
+             |ON t.pk = src.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET s.c1 = -1, s.c2.m = map('k', 'v'), s.c2.a = 
array(-1),
+             | s.c2.c3 = src.s.c2.c3
+             |WHEN NOT MATCHED THEN
+             | INSERT (pk, s, dep) VALUES (src.pk,
+             |   named_struct('c1', src.s.c1,
+             |     'c2', named_struct('a', src.s.c2.a, 'm', map('g', 'h'), 
'c3', true)), src.dep)
+             |""".stripMargin
+
+        if (withSchemaEvolution) {
+          sql(mergeStmt)
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)), "hr"),
+              Row(2, Row(20, Row(Seq(4, 5), Map("g" -> "h"), true)), 
"engineering")))
+        } else {
+          val exception = intercept[org.apache.spark.sql.AnalysisException] {
+            sql(mergeStmt)
+          }
+          assert(exception.errorClass.get == "FIELD_NOT_FOUND")
+          assert(exception.getMessage.contains("No such struct field `c3` in 
`a`, `m`. "))
+        }
+      }
+      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+    }
+  }
+
+  test("merge into schema evolution add column with nested field and set all 
columns") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTempView("source") {
+        createAndInitTable(
+          s"""pk INT NOT NULL,
+             |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, 
STRING>>>,
+             |dep STRING""".stripMargin,
+          """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } 
} }, "dep": "hr" }""")
+
+        val sourceTableSchema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("s", StructType(Seq(
+            StructField("c1", IntegerType),
+            StructField("c2", StructType(Seq(
+              StructField("a", ArrayType(IntegerType)),
+              StructField("m", MapType(StringType, StringType)),
+              StructField("c3", BooleanType) // new column
+            )))
+          ))),
+          StructField("dep", StringType)
+        ))
+        val data = Seq(
+          Row(1, Row(10, Row(Array(3, 4), Map("c" -> "d"), false)), "sales"),
+          Row(2, Row(20, Row(Array(4, 5), Map("e" -> "f"), true)), 
"engineering")
+        )
+        spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+          .createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        val mergeStmt =
+          s"""MERGE $schemaEvolutionClause
+             |INTO $tableNameAsString t
+             |USING source src
+             |ON t.pk = src.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin
+
+        if (withSchemaEvolution) {
+          sql(mergeStmt)
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(Row(1, Row(10, Row(Seq(3, 4), Map("c" -> "d"), false)), 
"sales"),
+              Row(2, Row(20, Row(Seq(4, 5), Map("e" -> "f"), true)), 
"engineering")))
+        } else {
+          val exception = intercept[org.apache.spark.sql.AnalysisException] {
+            sql(mergeStmt)
+          }
+          assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+          assert(exception.getMessage.contains(
+            "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+        }
+      }
+      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+    }
+  }
+
+  test("merge into schema evolution replace column with nested field and set 
explicit columns") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTempView("source") {
+        createAndInitTable(
+          s"""pk INT NOT NULL,
+             |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, 
STRING>>>,
+             |dep STRING""".stripMargin,
+          """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } 
} }, "dep": "hr" }""")
+
+        val sourceTableSchema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("s", StructType(Seq(
+            StructField("c1", IntegerType),
+            StructField("c2", StructType(Seq(
+              // removed column 'a'
+              StructField("m", MapType(StringType, StringType)),
+              StructField("c3", BooleanType) // new column
+            )))
+          ))),
+          StructField("dep", StringType)
+        ))
+        val data = Seq(
+          Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
+          Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+        )
+        spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+          .createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        val mergeStmt =
+          s"""MERGE $schemaEvolutionClause
+             |INTO $tableNameAsString t
+             |USING source src
+             |ON t.pk = src.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET s.c1 = -1, s.c2.m = map('k', 'v'), s.c2.a = 
array(-1),
+             | s.c2.c3 = src.s.c2.c3
+             |WHEN NOT MATCHED THEN
+             | INSERT (pk, s, dep) VALUES (src.pk,
+             |   named_struct('c1', src.s.c1,
+             |     'c2', named_struct('a', array(-2), 'm', map('g', 'h'), 
'c3', true)), src.dep)
+             |""".stripMargin
+
+        if (withSchemaEvolution) {
+          sql(mergeStmt)
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)), "hr"),
+              Row(2, Row(20, Row(Seq(-2), Map("g" -> "h"), true)), 
"engineering")))
+        } else {
+          val exception = intercept[org.apache.spark.sql.AnalysisException] {
+            sql(mergeStmt)
+          }
+          assert(exception.errorClass.get == "FIELD_NOT_FOUND")
+          assert(exception.getMessage.contains("No such struct field `c3` in 
`a`, `m`. "))
+        }
+      }
+      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+    }
+  }
+
+  // TODO- support schema evolution for missing nested types using UPDATE SET 
* and INSERT *
+  test("merge into schema evolution replace column with nested field and set 
all columns") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTempView("source") {
+        createAndInitTable(
+          s"""pk INT NOT NULL,
+             |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, 
STRING>>>,
+             |dep STRING""".stripMargin,
+          """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } 
} }, "dep": "hr" }""")
+
+        val sourceTableSchema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("s", StructType(Seq(
+            StructField("c1", IntegerType),
+            StructField("c2", StructType(Seq(
+              // removed column 'a'
+              StructField("m", MapType(StringType, StringType)),
+              StructField("c3", BooleanType) // new column
+            )))
+          ))),
+          StructField("dep", StringType)
+        ))
+        val data = Seq(
+          Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
+          Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+        )
+        spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+          .createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        val exception = intercept[org.apache.spark.sql.AnalysisException] {
+          sql(
+            s"""MERGE $schemaEvolutionClause
+               |INTO $tableNameAsString t
+               |USING source src
+               |ON t.pk = src.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin)
+        }
+
+        assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+        assert(exception.getMessage.contains("Cannot find data for the output 
column `s`.`c2`.`a`"))
+      }
+      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+    }
+  }
+
+  test("merge into schema evolution add column for struct in array and set all 
columns") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTempView("source") {
+        createAndInitTable(
+          s"""pk INT NOT NULL,
+             |a ARRAY<STRUCT<c1: INT, c2: STRING>>,
+             |dep STRING""".stripMargin,
+          """{ "pk": 0, "a": [ { "c1": 1, "c2": "a" }, { "c1": 2, "c2": "b" } 
], "dep": "sales"},
+             { "pk": 1, "a": [ { "c1": 1, "c2": "a" }, { "c1": 2, "c2": "b" } 
], "dep": "hr" }"""
+            .stripMargin)
+
+        val sourceTableSchema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("a", ArrayType(
+            StructType(Seq(
+              StructField("c1", IntegerType),
+              StructField("c2", StringType),
+              StructField("c3", BooleanType))))), // new column
+          StructField("dep", StringType)))
+        val data = Seq(
+          Row(1, Array(Row(10, "c", true), Row(20, "d", false)), "hr"),
+          Row(2, Array(Row(30, "d", false), Row(40, "e", true)), "engineering")
+        )
+        spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+          .createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        val mergeStmt =
+          s"""MERGE $schemaEvolutionClause
+             |INTO $tableNameAsString t
+             |USING source src
+             |ON t.pk = src.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin
+
+        if (withSchemaEvolution) {
+          sql(mergeStmt)
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            // TODO- InMemoryBaseTable does not return null for nested schema 
evolution.
+            Seq(Row(0, Array(Row(1, "a", true), Row(2, "b", true)), "sales"),
+              Row(1, Array(Row(10, "c", true), Row(20, "d", false)), "hr"),
+              Row(2, Array(Row(30, "d", false), Row(40, "e", true)), 
"engineering")))
+        } else {
+          val exception = intercept[org.apache.spark.sql.AnalysisException] {
+            sql(mergeStmt)
+          }
+          assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+          assert(exception.getMessage.contains(
+            "Cannot write extra fields `c3` to the struct `a`.`element`"))
+        }
+      }
+      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+    }
+  }
+
+  test("merge into schema evolution add column for struct in map and set all 
columns") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTempView("source") {
+        val schema =
+          StructType(Seq(
+            StructField("pk", IntegerType, nullable = false),
+            StructField("m", MapType(
+              StructType(Seq(StructField("c1", IntegerType))),
+              StructType(Seq(StructField("c2", StringType))))),
+            StructField("dep", StringType)))
+        createTable(CatalogV2Util.structTypeToV2Columns(schema))
+
+        val data = Seq(
+          Row(0, Map(Row(10) -> Row("c")), "hr"),
+          Row(1, Map(Row(20) -> Row("d")), "sales"))
+        spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+          .writeTo(tableNameAsString).append()
+
+        val sourceTableSchema = StructType(Seq(
+          StructField("pk", IntegerType),
+          StructField("m", MapType(
+            StructType(Seq(StructField("c1", IntegerType), StructField("c3", 
BooleanType))),
+            StructType(Seq(StructField("c2", StringType), StructField("c4", 
BooleanType))))),
+          StructField("dep", StringType)))
+        val sourceData = Seq(
+          Row(1, Map(Row(10, true) -> Row("y", false)), "sales"),
+          Row(2, Map(Row(20, false) -> Row("z", true)), "engineering")
+        )
+        spark.createDataFrame(spark.sparkContext.parallelize(sourceData), 
sourceTableSchema)
+          .createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+        val mergeStmt =
+          s"""MERGE $schemaEvolutionClause
+             |INTO $tableNameAsString t
+             |USING source src
+             |ON t.pk = src.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET *
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin
+
+        if (withSchemaEvolution) {
+          sql(mergeStmt)
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            // TODO- InMemoryBaseTable does not return null for nested schema 
evolution.
+            Seq(Row(0, Map(Row(10, true) -> Row("c", true)), "hr"),
+              Row(1, Map(Row(10, true) -> Row("y", false)), "sales"),
+              Row(2, Map(Row(20, false) -> Row("z", true)), "engineering")))
+        } else {
+          val exception = intercept[org.apache.spark.sql.AnalysisException] {
+            sql(mergeStmt)
+          }
+          assert(exception.errorClass.get == 
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+          assert(exception.getMessage.contains(
+            "Cannot write extra fields `c3` to the struct `m`.`key`"))
+        }
+      }
+      sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+    }
+  }
+
+  test("merge into empty table with NOT MATCHED clause schema evolution") {
+    Seq(true, false) foreach { withSchemaEvolution =>
+      withTempView("source") {
+        createTable("pk INT NOT NULL, salary INT, dep STRING")
+
+        val sourceRows = Seq(
+          (1, 100, "hr", true),
+          (2, 200, "finance", false),
+          (3, 300, "hr", true))
+        sourceRows.toDF("pk", "salary", "dep", 
"active").createOrReplaceTempView("source")
+
+        val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+
+        sql(
+          s"""MERGE $schemaEvolutionClause
+             |INTO $tableNameAsString t
+             |USING source s
+             |ON t.pk = s.pk
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin)
+
+        if (withSchemaEvolution) {
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(
+              Row(1, 100, "hr", true),
+              Row(2, 200, "finance", false),
+              Row(3, 300, "hr", true)))
+        } else {
+          checkAnswer(
+            sql(s"SELECT * FROM $tableNameAsString"),
+            Seq(
+              Row(1, 100, "hr"),
+              Row(2, 200, "finance"),
+              Row(3, 300, "hr")))
+        }
+        sql("DROP TABLE IF EXISTS " + tableNameAsString)
+      }
+    }
+  }
+
   private def findMergeExec(query: String): MergeRowsExec = {
     val plan = executeAndKeepPlan {
       sql(query)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index ccf502d79c00..ecc293a5acc2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -2315,11 +2315,24 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
          |USING testcat.tab2
          |ON 1 = 1
          |WHEN MATCHED THEN UPDATE SET *""".stripMargin
-    checkError(
-      exception = intercept[AnalysisException](parseAndResolve(sql2)),
-      condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-      parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"),
-      context = ExpectedContext(fragment = sql2, start = 0, stop = 80))
+    val parsed2 = parseAndResolve(sql2)
+    parsed2 match {
+      case MergeIntoTable(
+          AsDataSourceV2Relation(target),
+          AsDataSourceV2Relation(source),
+          EqualTo(IntegerLiteral(1), IntegerLiteral(1)),
+          Seq(UpdateAction(None, updateAssigns)), // Matched actions
+          Seq(), // Not matched actions
+          Seq(), // Not matched by source actions
+          withSchemaEvolution) =>
+        val ti = target.output.find(_.name == "i").get
+        val si = source.output.find(_.name == "i").get
+        assert(updateAssigns.size == 1)
+        
assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
+        
assert(updateAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
+        assert(withSchemaEvolution === false)
+      case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
+    }
 
     // INSERT * with incompatible schema between source and target tables.
     val sql3 =
@@ -2327,11 +2340,24 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
         |USING testcat.tab2
         |ON 1 = 1
         |WHEN NOT MATCHED THEN INSERT *""".stripMargin
-    checkError(
-      exception = intercept[AnalysisException](parseAndResolve(sql3)),
-      condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-      parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"),
-      context = ExpectedContext(fragment = sql3, start = 0, stop = 80))
+    val parsed3 = parseAndResolve(sql3)
+    parsed3 match {
+      case MergeIntoTable(
+          AsDataSourceV2Relation(target),
+          AsDataSourceV2Relation(source),
+          EqualTo(IntegerLiteral(1), IntegerLiteral(1)),
+          Seq(), // Matched action
+          Seq(InsertAction(None, insertAssigns)), // Not matched actions
+          Seq(), // Not matched by source actions
+          withSchemaEvolution) =>
+        val ti = target.output.find(_.name == "i").get
+        val si = source.output.find(_.name == "i").get
+        assert(insertAssigns.size == 1)
+        
assert(insertAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
+        
assert(insertAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
+        assert(withSchemaEvolution === false)
+      case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
+    }
 
     val sql4 =
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to