This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 898838a239d3 [SPARK-47627][SQL] Add SQL MERGE syntax to enable schema 
evolution
898838a239d3 is described below

commit 898838a239d370429e49108a56c6a7fb22d6b399
Author: Paddy Xu <xupa...@gmail.com>
AuthorDate: Wed Apr 17 10:53:02 2024 -0700

    [SPARK-47627][SQL] Add SQL MERGE syntax to enable schema evolution
    
    ### Why are the changes needed?
    
    This PR introduces a syntax `WITH SCHEMA EVOLUTION` to the SQL MERGE 
command, which allows the user to specify automatic schema evolution for a 
specific operation.
    
    ```sql
    MERGE WITH SCHEMA EVOLUTION
    INTO tgt
    USING src
    ON ...
    WHEN ...
    ```
    
    When `WITH SCHEMA EVOLUTION` is specified, schema evolution-related 
features must be turned on for this single statement and only in this statement.
    
    Spark is only responsible for recognizing the existence or absence of the 
syntax `WITH SCHEMA EVOLUTION`, and the result is passed down to the MERGE 
command. Data sources must respect the syntax and give appropriate reactions: 
turn on features that are categorised as "schema evolution" when the syntax 
does exist. For example, when the underlying table is Delta Lake, the feature 
"mergeSchema" will be turned on (see 
https://github.com/delta-io/delta/blob/c41977db3529a3139d6306abe5ded161 [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see the previous section.
    
    ### How was this patch tested?
    
    New tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45748 from xupefei/merge-schema-evolution.
    
    Authored-by: Paddy Xu <xupa...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../CheckConnectJvmClientCompatibility.scala       |  1 +
 docs/sql-ref-ansi-compliance.md                    |  1 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |  1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  2 +-
 .../catalyst/analysis/RewriteMergeIntoTable.scala  |  6 +--
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  4 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    |  3 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala      |  7 ++--
 .../PullupCorrelatedPredicatesSuite.scala          |  5 ++-
 .../ReplaceNullWithFalseInPredicateSuite.scala     |  6 ++-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 42 +++++++++++++++++----
 .../org/apache/spark/sql/MergeIntoWriter.scala     | 19 +++++++++-
 .../sql-tests/results/ansi/keywords.sql.out        |  1 +
 .../resources/sql-tests/results/keywords.sql.out   |  1 +
 .../execution/command/PlanResolutionSuite.scala    | 43 +++++++++++++++++-----
 .../ThriftServerWithSparkContextSuite.scala        |  2 +-
 17 files changed, 113 insertions(+), 35 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 0f383d007f29..f73290c5ce29 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -304,6 +304,7 @@ object CheckConnectJvmClientCompatibility {
 
       // MergeIntoWriter
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.MergeIntoWriter"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.MergeIntoWriter$"),
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched"),
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched$"),
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched"),
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index bf1819b9767b..0256a3e0869d 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -492,6 +492,7 @@ Below is a list of all the keywords in Spark SQL.
 |END|reserved|non-reserved|reserved|
 |ESCAPE|reserved|non-reserved|reserved|
 |ESCAPED|non-reserved|non-reserved|non-reserved|
+|EVOLUTION|non-reserved|non-reserved|non-reserved|
 |EXCEPT|reserved|strict-non-reserved|reserved|
 |EXCHANGE|non-reserved|non-reserved|non-reserved|
 |EXCLUDE|non-reserved|non-reserved|non-reserved|
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index e2b178d34b56..83e40c4a20a2 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -182,6 +182,7 @@ ELSE: 'ELSE';
 END: 'END';
 ESCAPE: 'ESCAPE';
 ESCAPED: 'ESCAPED';
+EVOLUTION: 'EVOLUTION';
 EXCEPT: 'EXCEPT';
 EXCHANGE: 'EXCHANGE';
 EXCLUDE: 'EXCLUDE';
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 3d008516589b..60b67b08021d 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -480,7 +480,7 @@ dmlStatementNoWith
     | fromClause multiInsertQueryBody+                                         
    #multiInsertQuery
     | DELETE FROM identifierReference tableAlias whereClause?                  
    #deleteFromTable
     | UPDATE identifierReference tableAlias setClause whereClause?             
    #updateTable
-    | MERGE INTO target=identifierReference targetAlias=tableAlias
+    | MERGE (WITH SCHEMA EVOLUTION)? INTO target=identifierReference 
targetAlias=tableAlias
         USING (source=identifierReference |
           LEFT_PAREN sourceQuery=query RIGHT_PAREN) sourceAlias=tableAlias
         ON mergeCondition=booleanExpression
@@ -1399,6 +1399,7 @@ ansiNonReserved
     | DOUBLE
     | DROP
     | ESCAPED
+    | EVOLUTION
     | EXCHANGE
     | EXCLUDE
     | EXISTS
@@ -1715,6 +1716,7 @@ nonReserved
     | END
     | ESCAPE
     | ESCAPED
+    | EVOLUTION
     | EXCHANGE
     | EXCLUDE
     | EXECUTE
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e741387d7657..e666200a78d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1659,7 +1659,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
 
       case u: UpdateTable => resolveReferencesInUpdate(u)
 
-      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _)
+      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _, _)
         if !m.resolved && targetTable.resolved && sourceTable.resolved =>
 
         EliminateSubqueryAliases(targetTable) match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 9e020cb55ed5..dacee70cf128 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -45,7 +45,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-        notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned 
&&
+        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned &&
         matchedActions.isEmpty && notMatchedActions.size == 1 &&
         notMatchedBySourceActions.isEmpty =>
 
@@ -79,7 +79,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
       }
 
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-        notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned 
&&
+        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned &&
         matchedActions.isEmpty && notMatchedBySourceActions.isEmpty =>
 
       EliminateSubqueryAliases(aliasedTable) match {
@@ -120,7 +120,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
       }
 
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-        notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned 
=>
+        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned =>
 
       EliminateSubqueryAliases(aliasedTable) match {
         case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, 
_, _) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 34672485ddc9..69220613a89e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -455,6 +455,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     }
 
   override def visitMergeIntoTable(ctx: MergeIntoTableContext): LogicalPlan = 
withOrigin(ctx) {
+    val withSchemaEvolution = ctx.EVOLUTION() != null
     val targetTable = createUnresolvedRelation(ctx.target)
     val targetTableAlias = getTableAliasWithoutColumnAlias(ctx.targetAlias, 
"MERGE")
     val aliasedTarget = targetTableAlias.map(SubqueryAlias(_, 
targetTable)).getOrElse(targetTable)
@@ -549,7 +550,8 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
       mergeCondition,
       matchedActions.toSeq,
       notMatchedActions.toSeq,
-      notMatchedBySourceActions.toSeq)
+      notMatchedBySourceActions.toSeq,
+      withSchemaEvolution)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 37e751ea9884..43d37801b86d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -754,7 +754,8 @@ case class MergeIntoTable(
     mergeCondition: Expression,
     matchedActions: Seq[MergeAction],
     notMatchedActions: Seq[MergeAction],
-    notMatchedBySourceActions: Seq[MergeAction]) extends BinaryCommand with 
SupportsSubquery {
+    notMatchedBySourceActions: Seq[MergeAction],
+    withSchemaEvolution: Boolean) extends BinaryCommand with SupportsSubquery {
 
   lazy val aligned: Boolean = {
     val actions = matchedActions ++ notMatchedActions ++ 
notMatchedBySourceActions
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 3c628d35dcdb..a65fbef1a373 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -759,9 +759,10 @@ class AnalysisSuite extends AnalysisTest with Matchers {
         testRelation,
         testRelation,
         cond,
-        UpdateAction(Some(cond), Assignment($"a", $"a") :: Nil) :: Nil,
-        Nil,
-        Nil
+        matchedActions = UpdateAction(Some(cond), Assignment($"a", $"a") :: 
Nil) :: Nil,
+        notMatchedActions = Nil,
+        notMatchedBySourceActions = Nil,
+        withSchemaEvolution = false
       ),
       "AMBIGUOUS_REFERENCE",
       Map("name" -> "`a`", "referenceNames" -> "[`a`, `a`]"))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
index 29bc46eaa3eb..cbd24bd7bb29 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
@@ -158,7 +158,8 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
       cond,
       Seq(DeleteAction(None)),
       Seq(InsertAction(None, Seq(Assignment($"a", $"c"), Assignment($"b", 
$"d")))),
-      Seq(DeleteAction(None)))
+      Seq(DeleteAction(None)),
+      withSchemaEvolution = false)
     val analyzedMergePlan = mergePlan.analyze
     assert(analyzedMergePlan.resolved)
 
@@ -166,7 +167,7 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
     assert(optimized.resolved)
 
     optimized match {
-      case MergeIntoTable(_, _, s: InSubquery, _, _, _) =>
+      case MergeIntoTable(_, _, s: InSubquery, _, _, _, _) =>
         val outerRefs = SubExprUtils.getOuterReferences(s.query.plan)
         assert(outerRefs.isEmpty, "should be no outer refs")
       case other =>
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
index 7d037799fba7..a50842a26b2c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
@@ -500,7 +500,8 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest 
{
         mergeCondition = expr,
         matchedActions,
         notMatchedActions,
-        notMatchedBySourceActions)
+        notMatchedBySourceActions,
+        withSchemaEvolution = false)
     }
     val originalPlan = func(testRelation, anotherTestRelation, 
originalCond).analyze
     val optimizedPlan = Optimize.execute(originalPlan)
@@ -522,7 +523,8 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest 
{
         mergeCondition = expr,
         matchedActions,
         notMatchedActions,
-        Seq.empty)
+        notMatchedBySourceActions = Seq.empty,
+        withSchemaEvolution = false)
     }
     val originalPlanWithStar = mergePlanWithStar(originalCond).analyze
     val optimizedPlanWithStar = Optimize.execute(originalPlanWithStar)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index b306ca3cd18a..568aa42bcf14 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -1875,7 +1875,8 @@ class DDLParserSuite extends AnalysisTest {
             Assignment(UnresolvedAttribute("target.col2"), 
UnresolvedAttribute("source.col2"))))),
         Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), 
Literal("delete")))),
           UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), 
Literal("update"))),
-            Seq(Assignment(UnresolvedAttribute("target.col3"), 
Literal("delete")))))))
+            Seq(Assignment(UnresolvedAttribute("target.col3"), 
Literal("delete"))))),
+        withSchemaEvolution = false))
   }
 
   test("merge into table: using subquery") {
@@ -1906,7 +1907,8 @@ class DDLParserSuite extends AnalysisTest {
             Assignment(UnresolvedAttribute("target.col2"), 
UnresolvedAttribute("source.col2"))))),
         Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), 
Literal("delete")))),
           UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), 
Literal("update"))),
-            Seq(Assignment(UnresolvedAttribute("target.col3"), 
Literal("delete")))))))
+            Seq(Assignment(UnresolvedAttribute("target.col3"), 
Literal("delete"))))),
+        withSchemaEvolution = false))
   }
 
   test("merge into table: cte") {
@@ -1939,7 +1941,8 @@ class DDLParserSuite extends AnalysisTest {
             Assignment(UnresolvedAttribute("target.col2"), 
UnresolvedAttribute("source.col2"))))),
         Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), 
Literal("delete")))),
           UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), 
Literal("update"))),
-            Seq(Assignment(UnresolvedAttribute("target.col3"), 
Literal("delete")))))))
+            Seq(Assignment(UnresolvedAttribute("target.col3"), 
Literal("delete"))))),
+        withSchemaEvolution = false))
   }
 
   test("merge into table: no additional condition") {
@@ -1962,7 +1965,8 @@ class DDLParserSuite extends AnalysisTest {
       Seq(InsertAction(None,
         Seq(Assignment(UnresolvedAttribute("target.col1"), 
UnresolvedAttribute("source.col1")),
           Assignment(UnresolvedAttribute("target.col2"), 
UnresolvedAttribute("source.col2"))))),
-      Seq(DeleteAction(None))))
+      Seq(DeleteAction(None)),
+      withSchemaEvolution = false))
   }
 
   test("merge into table: star") {
@@ -1983,7 +1987,8 @@ class DDLParserSuite extends AnalysisTest {
       Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), 
Literal("delete")))),
         UpdateStarAction(Some(EqualTo(UnresolvedAttribute("target.col2"), 
Literal("update"))))),
       Seq(InsertStarAction(Some(EqualTo(UnresolvedAttribute("target.col2"), 
Literal("insert"))))),
-      Seq.empty))
+      Seq.empty,
+      withSchemaEvolution = false))
   }
 
   test("merge into table: invalid star in not matched by source") {
@@ -2024,7 +2029,8 @@ class DDLParserSuite extends AnalysisTest {
             Seq(Assignment(UnresolvedAttribute("target.col1"), Literal(1)),
               Assignment(UnresolvedAttribute("target.col2"), 
UnresolvedAttribute("source.col2")))),
           InsertStarAction(None)),
-        Seq.empty))
+        Seq.empty,
+        withSchemaEvolution = false))
   }
 
   test("merge into table: column aliases are not allowed") {
@@ -2085,7 +2091,26 @@ class DDLParserSuite extends AnalysisTest {
           UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), 
Literal("update1"))),
             Seq(Assignment(UnresolvedAttribute("target.col3"), Literal(1)))),
           UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), 
Literal("update2"))),
-            Seq(Assignment(UnresolvedAttribute("target.col3"), Literal(2)))))))
+            Seq(Assignment(UnresolvedAttribute("target.col3"), Literal(2))))),
+        withSchemaEvolution = false))
+  }
+
+  test("merge into table: schema evolution") {
+    parseCompare(
+      """
+        |MERGE WITH SCHEMA EVOLUTION INTO testcat1.ns1.ns2.tbl AS target
+        |USING testcat2.ns1.ns2.tbl AS source
+        |ON target.col1 = source.col1
+        |WHEN NOT MATCHED BY SOURCE THEN DELETE
+    """.stripMargin,
+      MergeIntoTable(
+        SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", 
"ns2", "tbl"))),
+        SubqueryAlias("source", UnresolvedRelation(Seq("testcat2", "ns1", 
"ns2", "tbl"))),
+        EqualTo(UnresolvedAttribute("target.col1"), 
UnresolvedAttribute("source.col1")),
+        matchedActions = Seq.empty,
+        notMatchedActions = Seq.empty,
+        notMatchedBySourceActions = Seq(DeleteAction(None)),
+        withSchemaEvolution = true))
   }
 
   test("merge into table: only the last matched clause can omit the 
condition") {
@@ -2824,7 +2849,8 @@ class DDLParserSuite extends AnalysisTest {
         Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), 
Literal("delete")))),
           UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), 
Literal("update"))),
             Seq(Assignment(UnresolvedAttribute("target.col2"),
-              UnresolvedAttribute("DEFAULT")))))))
+              UnresolvedAttribute("DEFAULT"))))),
+        withSchemaEvolution = false))
   }
 
   test("SPARK-40944: Relax ordering constraint for CREATE TABLE column 
options") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala
index ca04b9bfc55f..5020d1c88023 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala
@@ -32,11 +32,17 @@ import org.apache.spark.sql.functions.expr
  * @param table the name of the target table for the merge operation.
  * @param ds the source Dataset to merge into the target table.
  * @param on the merge condition.
+ * @param schemaEvolutionEnabled whether to enable automatic schema evolution 
for this merge
+ *                               operation. Default is `false`.
  *
  * @since 4.0.0
  */
 @Experimental
-class MergeIntoWriter[T] private[sql] (table: String, ds: Dataset[T], on: 
Column) {
+class MergeIntoWriter[T] private[sql] (
+    table: String,
+    ds: Dataset[T],
+    on: Column,
+    schemaEvolutionEnabled: Boolean = false) {
 
   private val df: DataFrame = ds.toDF()
 
@@ -160,6 +166,14 @@ class MergeIntoWriter[T] private[sql] (table: String, ds: 
Dataset[T], on: Column
     new WhenNotMatchedBySource[T](this, Some(condition.expr))
   }
 
+  /**
+   * Enable automatic schema evolution for this merge operation.
+   * @return A `MergeIntoWriter` instance with schema evolution enabled.
+   */
+  def withSchemaEvolution(): MergeIntoWriter[T] = {
+    new MergeIntoWriter[T](this.table, this.ds, this.on, 
schemaEvolutionEnabled = true)
+  }
+
   /**
    * Executes the merge operation.
    */
@@ -176,7 +190,8 @@ class MergeIntoWriter[T] private[sql] (table: String, ds: 
Dataset[T], on: Column
       on.expr,
       matchedActions,
       notMatchedActions,
-      notMatchedBySourceActions)
+      notMatchedBySourceActions,
+      schemaEvolutionEnabled)
     val qe = sparkSession.sessionState.executePlan(merge)
     qe.assertCommandExecuted()
   }
diff --git 
a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
index 8b4acd12911b..836fd4809b1c 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
@@ -97,6 +97,7 @@ ELSE  true
 END    true
 ESCAPE true
 ESCAPED        false
+EVOLUTION      false
 EXCEPT true
 EXCHANGE       false
 EXCLUDE        false
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index 884f17c23eb0..3fca948e74f7 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -97,6 +97,7 @@ ELSE  false
 END    false
 ESCAPE false
 ESCAPED        false
+EVOLUTION      false
 EXCEPT false
 EXCHANGE       false
 EXCLUDE        false
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 28ea2c9bec1a..60f86ede7279 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1586,7 +1586,7 @@ class PlanResolutionSuite extends AnalysisTest {
         // basic
         val sql1 =
           s"""
-             |MERGE INTO $target AS target
+             |MERGE WITH SCHEMA EVOLUTION INTO $target AS target
              |USING $source AS source
              |ON target.i = source.i
              |WHEN MATCHED AND (target.s='delete') THEN DELETE
@@ -1608,12 +1608,14 @@ class PlanResolutionSuite extends AnalysisTest {
                   insertAssigns)),
               Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, 
StringLiteral("delete")))),
                 UpdateAction(Some(EqualTo(nul: AttributeReference, 
StringLiteral("update"))),
-                  notMatchedBySourceUpdateAssigns))) =>
+                  notMatchedBySourceUpdateAssigns)),
+              withSchemaEvolution) =>
             checkMergeConditionResolution(target, source, mergeCondition)
             checkMatchedClausesResolution(target, source, Some(dl), Some(ul), 
updateAssigns)
             checkNotMatchedClausesResolution(target, source, Some(il), 
insertAssigns)
             checkNotMatchedBySourceClausesResolution(target, Some(ndl), 
Some(nul),
               notMatchedBySourceUpdateAssigns)
+            assert(withSchemaEvolution === true)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
         }
@@ -1638,11 +1640,13 @@ class PlanResolutionSuite extends AnalysisTest {
                   StringLiteral("update"))), updateAssigns)),
               Seq(InsertAction(Some(EqualTo(il: AttributeReference, 
StringLiteral("insert"))),
                   insertAssigns)),
-              Seq()) =>
+              Seq(),
+              withSchemaEvolution) =>
             checkMergeConditionResolution(target, source, mergeCondition)
             checkMatchedClausesResolution(target, source, Some(dl), Some(ul), 
updateAssigns,
               starInUpdate = true)
             checkNotMatchedClausesResolution(target, source, Some(il), 
insertAssigns)
+            assert(withSchemaEvolution === false)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
         }
@@ -1663,11 +1667,13 @@ class PlanResolutionSuite extends AnalysisTest {
               mergeCondition,
               Seq(UpdateAction(None, updateAssigns)),
               Seq(InsertAction(None, insertAssigns)),
-              Seq()) =>
+              Seq(),
+              withSchemaEvolution) =>
             checkMergeConditionResolution(target, source, mergeCondition)
             checkMatchedClausesResolution(target, source, None, None, 
updateAssigns,
               starInUpdate = true)
             checkNotMatchedClausesResolution(target, source, None, 
insertAssigns)
+            assert(withSchemaEvolution === false)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
         }
@@ -1692,12 +1698,14 @@ class PlanResolutionSuite extends AnalysisTest {
               Seq(DeleteAction(Some(_)), UpdateAction(None, updateAssigns)),
               Seq(InsertAction(None, insertAssigns)),
               Seq(DeleteAction(Some(EqualTo(_: AttributeReference, 
StringLiteral("delete")))),
-                UpdateAction(None, notMatchedBySourceUpdateAssigns))) =>
+                UpdateAction(None, notMatchedBySourceUpdateAssigns)),
+              withSchemaEvolution) =>
             checkMergeConditionResolution(target, source, mergeCondition)
             checkMatchedClausesResolution(target, source, None, None, 
updateAssigns)
             checkNotMatchedClausesResolution(target, source, None, 
insertAssigns)
             checkNotMatchedBySourceClausesResolution(target, None, None,
               notMatchedBySourceUpdateAssigns)
+            assert(withSchemaEvolution === false)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
         }
@@ -1727,12 +1735,14 @@ class PlanResolutionSuite extends AnalysisTest {
                   insertAssigns)),
               Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, 
StringLiteral("delete")))),
                 UpdateAction(Some(EqualTo(nul: AttributeReference, 
StringLiteral("update"))),
-                  notMatchedBySourceUpdateAssigns))) =>
+                  notMatchedBySourceUpdateAssigns)),
+              withSchemaEvolution) =>
             checkMergeConditionResolution(target, source, mergeCondition)
             checkMatchedClausesResolution(target, source, Some(dl), Some(ul), 
updateAssigns)
             checkNotMatchedClausesResolution(target, source, Some(il), 
insertAssigns)
             checkNotMatchedBySourceClausesResolution(target, Some(ndl), 
Some(nul),
               notMatchedBySourceUpdateAssigns)
+            assert(withSchemaEvolution === false)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
         }
@@ -1764,12 +1774,14 @@ class PlanResolutionSuite extends AnalysisTest {
                   insertAssigns)),
               Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, 
StringLiteral("delete")))),
                 UpdateAction(Some(EqualTo(nul: AttributeReference, 
StringLiteral("update"))),
-                  notMatchedBySourceUpdateAssigns))) =>
+                  notMatchedBySourceUpdateAssigns)),
+              withSchemaEvolution) =>
             checkMergeConditionResolution(target, source, mergeCondition)
             checkMatchedClausesResolution(target, source, Some(dl), Some(ul), 
updateAssigns)
             checkNotMatchedClausesResolution(target, source, Some(il), 
insertAssigns)
             checkNotMatchedBySourceClausesResolution(target, Some(ndl), 
Some(nul),
               notMatchedBySourceUpdateAssigns)
+            assert(withSchemaEvolution === false)
           case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
         }
 
@@ -1837,6 +1849,7 @@ class PlanResolutionSuite extends AnalysisTest {
               case other =>
                 fail("unexpected second not matched by source action " + other)
             }
+            assert(m.withSchemaEvolution === false)
 
           case other =>
             fail("Expect MergeIntoTable, but got:\n" + other.treeString)
@@ -1905,6 +1918,7 @@ class PlanResolutionSuite extends AnalysisTest {
           Seq(Assignment(_: AttributeReference, Literal(42, IntegerType)))) =>
           case other => fail("unexpected second not matched by source action " 
+ other)
         }
+        assert(m.withSchemaEvolution === false)
 
       case other =>
         fail("Expect MergeIntoTable, but got:\n" + other.treeString)
@@ -2009,6 +2023,7 @@ class PlanResolutionSuite extends AnalysisTest {
         assert(m.matchedActions.length == 2)
         assert(m.notMatchedActions.length == 1)
         assert(m.notMatchedBySourceActions.length == 2)
+        assert(m.withSchemaEvolution === false)
 
       case other =>
         fail("Expect MergeIntoTable, but got:\n" + other.treeString)
@@ -2045,7 +2060,8 @@ class PlanResolutionSuite extends AnalysisTest {
             Seq(InsertAction(
               Some(EqualTo(il: AttributeReference, StringLiteral("a"))),
             insertAssigns)),
-            Seq(DeleteAction(Some(_)), UpdateAction(None, 
secondUpdateAssigns))) =>
+            Seq(DeleteAction(Some(_)), UpdateAction(None, 
secondUpdateAssigns)),
+            withSchemaEvolution) =>
           val ti = target.output.find(_.name == "i").get
           val ts = target.output.find(_.name == "s").get
           val si = source.output.find(_.name == "i").get
@@ -2064,6 +2080,7 @@ class PlanResolutionSuite extends AnalysisTest {
           assert(secondUpdateAssigns.size == 1)
           // UPDATE key is resolved with target table only, so column `s` is 
not ambiguous.
           
assert(secondUpdateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
+          assert(withSchemaEvolution === false)
 
         case p => fail("Expect MergeIntoTable, but got:\n" + p.treeString)
       }
@@ -2150,7 +2167,8 @@ class PlanResolutionSuite extends AnalysisTest {
             _,
             Seq(),
             Seq(),
-            notMatchedBySourceActions) =>
+            notMatchedBySourceActions,
+            withSchemaEvolution) =>
           assert(notMatchedBySourceActions.length == 2)
           notMatchedBySourceActions(0) match {
             case DeleteAction(Some(EqualTo(dl: AttributeReference, 
StringLiteral("b")))) =>
@@ -2171,6 +2189,7 @@ class PlanResolutionSuite extends AnalysisTest {
               assert(us.sameRef(ti))
             case other => fail("unexpected second not matched by source action 
" + other)
           }
+          assert(withSchemaEvolution === false)
       }
 
       val sql7 =
@@ -2205,6 +2224,7 @@ class PlanResolutionSuite extends AnalysisTest {
       case u: MergeIntoTable =>
         assert(u.targetTable.isInstanceOf[UnresolvedRelation])
         assert(u.sourceTable.isInstanceOf[UnresolvedRelation])
+        assert(u.withSchemaEvolution === false)
       case _ => fail("Expect MergeIntoTable, but got:\n" + parsed.treeString)
     }
 
@@ -2283,6 +2303,7 @@ class PlanResolutionSuite extends AnalysisTest {
             assert(s2.functionName == "varcharTypeWriteSideCheck")
           case other => fail("Expect UpdateAction, but got: " + other)
         }
+        assert(m.withSchemaEvolution === false)
       case other => fail("Expect MergeIntoTable, but got:\n" + 
other.treeString)
     }
   }
@@ -2304,12 +2325,14 @@ class PlanResolutionSuite extends AnalysisTest {
           _,
           Seq(DeleteAction(None)),
           Seq(InsertAction(None, insertAssigns)),
-          Nil) =>
+          Nil,
+          withSchemaEvolution) =>
         // There is only one assignment, the missing col is not filled with 
default value
         assert(insertAssigns.size == 1)
         // Special case: Spark does not resolve any columns in MERGE if table 
accepts any schema.
         assert(insertAssigns.head.key.asInstanceOf[UnresolvedAttribute].name 
== "target.i")
         assert(insertAssigns.head.value.asInstanceOf[UnresolvedAttribute].name 
== "DEFAULT")
+        assert(withSchemaEvolution === false)
 
       case l => fail("Expected unresolved MergeIntoTable, but got:\n" + 
l.treeString)
     }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index 51123b17eeec..1c0b4a080ee5 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
       val sessionHandle = client.openSession(user, "")
       val infoValue = client.getInfo(sessionHandle, 
GetInfoType.CLI_ODBC_KEYWORDS)
       // scalastyle:off line.size.limit
-      assert(infoValue.getStringValue == 
"ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_US
 [...]
+      assert(infoValue.getStringValue == 
"ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_US
 [...]
       // scalastyle:on line.size.limit
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to