This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 898838a239d3 [SPARK-47627][SQL] Add SQL MERGE syntax to enable schema evolution 898838a239d3 is described below commit 898838a239d370429e49108a56c6a7fb22d6b399 Author: Paddy Xu <xupa...@gmail.com> AuthorDate: Wed Apr 17 10:53:02 2024 -0700 [SPARK-47627][SQL] Add SQL MERGE syntax to enable schema evolution ### Why are the changes needed? This PR introduces a syntax `WITH SCHEMA EVOLUTION` to the SQL MERGE command, which allows the user to specify automatic schema evolution for a specific operation. ```sql MERGE WITH SCHEMA EVOLUTION INTO tgt USING src ON ... WHEN ... ``` When `WITH SCHEMA EVOLUTION` is specified, schema evolution-related features must be turned on for this single statement and only in this statement. Spark is only responsible for recognizing the existence or absence of the syntax `WITH SCHEMA EVOLUTION`, and the result is passed down to the MERGE command. Data sources must respect the syntax and give appropriate reactions: turn on features that are categorised as "schema evolution" when the syntax does exist. For example, when the underlying table is Delta Lake, the feature "mergeSchema" will be turned on (see https://github.com/delta-io/delta/blob/c41977db3529a3139d6306abe5ded161 [...] ### Does this PR introduce _any_ user-facing change? Yes, see the previous section. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45748 from xupefei/merge-schema-evolution. Authored-by: Paddy Xu <xupa...@gmail.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../CheckConnectJvmClientCompatibility.scala | 1 + docs/sql-ref-ansi-compliance.md | 1 + .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 1 + .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 4 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../catalyst/analysis/RewriteMergeIntoTable.scala | 6 +-- .../spark/sql/catalyst/parser/AstBuilder.scala | 4 +- .../sql/catalyst/plans/logical/v2Commands.scala | 3 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 7 ++-- .../PullupCorrelatedPredicatesSuite.scala | 5 ++- .../ReplaceNullWithFalseInPredicateSuite.scala | 6 ++- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 42 +++++++++++++++++---- .../org/apache/spark/sql/MergeIntoWriter.scala | 19 +++++++++- .../sql-tests/results/ansi/keywords.sql.out | 1 + .../resources/sql-tests/results/keywords.sql.out | 1 + .../execution/command/PlanResolutionSuite.scala | 43 +++++++++++++++++----- .../ThriftServerWithSparkContextSuite.scala | 2 +- 17 files changed, 113 insertions(+), 35 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index 0f383d007f29..f73290c5ce29 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -304,6 +304,7 @@ object CheckConnectJvmClientCompatibility { // MergeIntoWriter ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.MergeIntoWriter"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.MergeIntoWriter$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched"), diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index bf1819b9767b..0256a3e0869d 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -492,6 +492,7 @@ Below is a list of all the keywords in Spark SQL. |END|reserved|non-reserved|reserved| |ESCAPE|reserved|non-reserved|reserved| |ESCAPED|non-reserved|non-reserved|non-reserved| +|EVOLUTION|non-reserved|non-reserved|non-reserved| |EXCEPT|reserved|strict-non-reserved|reserved| |EXCHANGE|non-reserved|non-reserved|non-reserved| |EXCLUDE|non-reserved|non-reserved|non-reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index e2b178d34b56..83e40c4a20a2 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -182,6 +182,7 @@ ELSE: 'ELSE'; END: 'END'; ESCAPE: 'ESCAPE'; ESCAPED: 'ESCAPED'; +EVOLUTION: 'EVOLUTION'; EXCEPT: 'EXCEPT'; EXCHANGE: 'EXCHANGE'; EXCLUDE: 'EXCLUDE'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 3d008516589b..60b67b08021d 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -480,7 +480,7 @@ dmlStatementNoWith | fromClause multiInsertQueryBody+ #multiInsertQuery | DELETE FROM identifierReference tableAlias whereClause? #deleteFromTable | UPDATE identifierReference tableAlias setClause whereClause? #updateTable - | MERGE INTO target=identifierReference targetAlias=tableAlias + | MERGE (WITH SCHEMA EVOLUTION)? INTO target=identifierReference targetAlias=tableAlias USING (source=identifierReference | LEFT_PAREN sourceQuery=query RIGHT_PAREN) sourceAlias=tableAlias ON mergeCondition=booleanExpression @@ -1399,6 +1399,7 @@ ansiNonReserved | DOUBLE | DROP | ESCAPED + | EVOLUTION | EXCHANGE | EXCLUDE | EXISTS @@ -1715,6 +1716,7 @@ nonReserved | END | ESCAPE | ESCAPED + | EVOLUTION | EXCHANGE | EXCLUDE | EXECUTE diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e741387d7657..e666200a78d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1659,7 +1659,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case u: UpdateTable => resolveReferencesInUpdate(u) - case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _) + case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _, _) if !m.resolved && targetTable.resolved && sourceTable.resolved => EliminateSubqueryAliases(targetTable) match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 9e020cb55ed5..dacee70cf128 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -45,7 +45,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, - notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned && + notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned && matchedActions.isEmpty && notMatchedActions.size == 1 && notMatchedBySourceActions.isEmpty => @@ -79,7 +79,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper } case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, - notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned && + notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned && matchedActions.isEmpty && notMatchedBySourceActions.isEmpty => EliminateSubqueryAliases(aliasedTable) match { @@ -120,7 +120,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper } case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, - notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned => + notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned => EliminateSubqueryAliases(aliasedTable) match { case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 34672485ddc9..69220613a89e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -455,6 +455,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { } override def visitMergeIntoTable(ctx: MergeIntoTableContext): LogicalPlan = withOrigin(ctx) { + val withSchemaEvolution = ctx.EVOLUTION() != null val targetTable = createUnresolvedRelation(ctx.target) val targetTableAlias = getTableAliasWithoutColumnAlias(ctx.targetAlias, "MERGE") val aliasedTarget = targetTableAlias.map(SubqueryAlias(_, targetTable)).getOrElse(targetTable) @@ -549,7 +550,8 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { mergeCondition, matchedActions.toSeq, notMatchedActions.toSeq, - notMatchedBySourceActions.toSeq) + notMatchedBySourceActions.toSeq, + withSchemaEvolution) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 37e751ea9884..43d37801b86d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -754,7 +754,8 @@ case class MergeIntoTable( mergeCondition: Expression, matchedActions: Seq[MergeAction], notMatchedActions: Seq[MergeAction], - notMatchedBySourceActions: Seq[MergeAction]) extends BinaryCommand with SupportsSubquery { + notMatchedBySourceActions: Seq[MergeAction], + withSchemaEvolution: Boolean) extends BinaryCommand with SupportsSubquery { lazy val aligned: Boolean = { val actions = matchedActions ++ notMatchedActions ++ notMatchedBySourceActions diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 3c628d35dcdb..a65fbef1a373 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -759,9 +759,10 @@ class AnalysisSuite extends AnalysisTest with Matchers { testRelation, testRelation, cond, - UpdateAction(Some(cond), Assignment($"a", $"a") :: Nil) :: Nil, - Nil, - Nil + matchedActions = UpdateAction(Some(cond), Assignment($"a", $"a") :: Nil) :: Nil, + notMatchedActions = Nil, + notMatchedBySourceActions = Nil, + withSchemaEvolution = false ), "AMBIGUOUS_REFERENCE", Map("name" -> "`a`", "referenceNames" -> "[`a`, `a`]")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala index 29bc46eaa3eb..cbd24bd7bb29 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala @@ -158,7 +158,8 @@ class PullupCorrelatedPredicatesSuite extends PlanTest { cond, Seq(DeleteAction(None)), Seq(InsertAction(None, Seq(Assignment($"a", $"c"), Assignment($"b", $"d")))), - Seq(DeleteAction(None))) + Seq(DeleteAction(None)), + withSchemaEvolution = false) val analyzedMergePlan = mergePlan.analyze assert(analyzedMergePlan.resolved) @@ -166,7 +167,7 @@ class PullupCorrelatedPredicatesSuite extends PlanTest { assert(optimized.resolved) optimized match { - case MergeIntoTable(_, _, s: InSubquery, _, _, _) => + case MergeIntoTable(_, _, s: InSubquery, _, _, _, _) => val outerRefs = SubExprUtils.getOuterReferences(s.query.plan) assert(outerRefs.isEmpty, "should be no outer refs") case other => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala index 7d037799fba7..a50842a26b2c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala @@ -500,7 +500,8 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest { mergeCondition = expr, matchedActions, notMatchedActions, - notMatchedBySourceActions) + notMatchedBySourceActions, + withSchemaEvolution = false) } val originalPlan = func(testRelation, anotherTestRelation, originalCond).analyze val optimizedPlan = Optimize.execute(originalPlan) @@ -522,7 +523,8 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest { mergeCondition = expr, matchedActions, notMatchedActions, - Seq.empty) + notMatchedBySourceActions = Seq.empty, + withSchemaEvolution = false) } val originalPlanWithStar = mergePlanWithStar(originalCond).analyze val optimizedPlanWithStar = Optimize.execute(originalPlanWithStar) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index b306ca3cd18a..568aa42bcf14 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1875,7 +1875,8 @@ class DDLParserSuite extends AnalysisTest { Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))), Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("delete")))), UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update"))), - Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete"))))))) + Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete"))))), + withSchemaEvolution = false)) } test("merge into table: using subquery") { @@ -1906,7 +1907,8 @@ class DDLParserSuite extends AnalysisTest { Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))), Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("delete")))), UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update"))), - Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete"))))))) + Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete"))))), + withSchemaEvolution = false)) } test("merge into table: cte") { @@ -1939,7 +1941,8 @@ class DDLParserSuite extends AnalysisTest { Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))), Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("delete")))), UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update"))), - Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete"))))))) + Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete"))))), + withSchemaEvolution = false)) } test("merge into table: no additional condition") { @@ -1962,7 +1965,8 @@ class DDLParserSuite extends AnalysisTest { Seq(InsertAction(None, Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")), Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))), - Seq(DeleteAction(None)))) + Seq(DeleteAction(None)), + withSchemaEvolution = false)) } test("merge into table: star") { @@ -1983,7 +1987,8 @@ class DDLParserSuite extends AnalysisTest { Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("delete")))), UpdateStarAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update"))))), Seq(InsertStarAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert"))))), - Seq.empty)) + Seq.empty, + withSchemaEvolution = false)) } test("merge into table: invalid star in not matched by source") { @@ -2024,7 +2029,8 @@ class DDLParserSuite extends AnalysisTest { Seq(Assignment(UnresolvedAttribute("target.col1"), Literal(1)), Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2")))), InsertStarAction(None)), - Seq.empty)) + Seq.empty, + withSchemaEvolution = false)) } test("merge into table: column aliases are not allowed") { @@ -2085,7 +2091,26 @@ class DDLParserSuite extends AnalysisTest { UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update1"))), Seq(Assignment(UnresolvedAttribute("target.col3"), Literal(1)))), UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update2"))), - Seq(Assignment(UnresolvedAttribute("target.col3"), Literal(2))))))) + Seq(Assignment(UnresolvedAttribute("target.col3"), Literal(2))))), + withSchemaEvolution = false)) + } + + test("merge into table: schema evolution") { + parseCompare( + """ + |MERGE WITH SCHEMA EVOLUTION INTO testcat1.ns1.ns2.tbl AS target + |USING testcat2.ns1.ns2.tbl AS source + |ON target.col1 = source.col1 + |WHEN NOT MATCHED BY SOURCE THEN DELETE + """.stripMargin, + MergeIntoTable( + SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))), + SubqueryAlias("source", UnresolvedRelation(Seq("testcat2", "ns1", "ns2", "tbl"))), + EqualTo(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")), + matchedActions = Seq.empty, + notMatchedActions = Seq.empty, + notMatchedBySourceActions = Seq(DeleteAction(None)), + withSchemaEvolution = true)) } test("merge into table: only the last matched clause can omit the condition") { @@ -2824,7 +2849,8 @@ class DDLParserSuite extends AnalysisTest { Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("delete")))), UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update"))), Seq(Assignment(UnresolvedAttribute("target.col2"), - UnresolvedAttribute("DEFAULT"))))))) + UnresolvedAttribute("DEFAULT"))))), + withSchemaEvolution = false)) } test("SPARK-40944: Relax ordering constraint for CREATE TABLE column options") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala index ca04b9bfc55f..5020d1c88023 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala @@ -32,11 +32,17 @@ import org.apache.spark.sql.functions.expr * @param table the name of the target table for the merge operation. * @param ds the source Dataset to merge into the target table. * @param on the merge condition. + * @param schemaEvolutionEnabled whether to enable automatic schema evolution for this merge + * operation. Default is `false`. * * @since 4.0.0 */ @Experimental -class MergeIntoWriter[T] private[sql] (table: String, ds: Dataset[T], on: Column) { +class MergeIntoWriter[T] private[sql] ( + table: String, + ds: Dataset[T], + on: Column, + schemaEvolutionEnabled: Boolean = false) { private val df: DataFrame = ds.toDF() @@ -160,6 +166,14 @@ class MergeIntoWriter[T] private[sql] (table: String, ds: Dataset[T], on: Column new WhenNotMatchedBySource[T](this, Some(condition.expr)) } + /** + * Enable automatic schema evolution for this merge operation. + * @return A `MergeIntoWriter` instance with schema evolution enabled. + */ + def withSchemaEvolution(): MergeIntoWriter[T] = { + new MergeIntoWriter[T](this.table, this.ds, this.on, schemaEvolutionEnabled = true) + } + /** * Executes the merge operation. */ @@ -176,7 +190,8 @@ class MergeIntoWriter[T] private[sql] (table: String, ds: Dataset[T], on: Column on.expr, matchedActions, notMatchedActions, - notMatchedBySourceActions) + notMatchedBySourceActions, + schemaEvolutionEnabled) val qe = sparkSession.sessionState.executePlan(merge) qe.assertCommandExecuted() } diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out index 8b4acd12911b..836fd4809b1c 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out @@ -97,6 +97,7 @@ ELSE true END true ESCAPE true ESCAPED false +EVOLUTION false EXCEPT true EXCHANGE false EXCLUDE false diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out index 884f17c23eb0..3fca948e74f7 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out @@ -97,6 +97,7 @@ ELSE false END false ESCAPE false ESCAPED false +EVOLUTION false EXCEPT false EXCHANGE false EXCLUDE false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 28ea2c9bec1a..60f86ede7279 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1586,7 +1586,7 @@ class PlanResolutionSuite extends AnalysisTest { // basic val sql1 = s""" - |MERGE INTO $target AS target + |MERGE WITH SCHEMA EVOLUTION INTO $target AS target |USING $source AS source |ON target.i = source.i |WHEN MATCHED AND (target.s='delete') THEN DELETE @@ -1608,12 +1608,14 @@ class PlanResolutionSuite extends AnalysisTest { insertAssigns)), Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, StringLiteral("delete")))), UpdateAction(Some(EqualTo(nul: AttributeReference, StringLiteral("update"))), - notMatchedBySourceUpdateAssigns))) => + notMatchedBySourceUpdateAssigns)), + withSchemaEvolution) => checkMergeConditionResolution(target, source, mergeCondition) checkMatchedClausesResolution(target, source, Some(dl), Some(ul), updateAssigns) checkNotMatchedClausesResolution(target, source, Some(il), insertAssigns) checkNotMatchedBySourceClausesResolution(target, Some(ndl), Some(nul), notMatchedBySourceUpdateAssigns) + assert(withSchemaEvolution === true) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) } @@ -1638,11 +1640,13 @@ class PlanResolutionSuite extends AnalysisTest { StringLiteral("update"))), updateAssigns)), Seq(InsertAction(Some(EqualTo(il: AttributeReference, StringLiteral("insert"))), insertAssigns)), - Seq()) => + Seq(), + withSchemaEvolution) => checkMergeConditionResolution(target, source, mergeCondition) checkMatchedClausesResolution(target, source, Some(dl), Some(ul), updateAssigns, starInUpdate = true) checkNotMatchedClausesResolution(target, source, Some(il), insertAssigns) + assert(withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) } @@ -1663,11 +1667,13 @@ class PlanResolutionSuite extends AnalysisTest { mergeCondition, Seq(UpdateAction(None, updateAssigns)), Seq(InsertAction(None, insertAssigns)), - Seq()) => + Seq(), + withSchemaEvolution) => checkMergeConditionResolution(target, source, mergeCondition) checkMatchedClausesResolution(target, source, None, None, updateAssigns, starInUpdate = true) checkNotMatchedClausesResolution(target, source, None, insertAssigns) + assert(withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) } @@ -1692,12 +1698,14 @@ class PlanResolutionSuite extends AnalysisTest { Seq(DeleteAction(Some(_)), UpdateAction(None, updateAssigns)), Seq(InsertAction(None, insertAssigns)), Seq(DeleteAction(Some(EqualTo(_: AttributeReference, StringLiteral("delete")))), - UpdateAction(None, notMatchedBySourceUpdateAssigns))) => + UpdateAction(None, notMatchedBySourceUpdateAssigns)), + withSchemaEvolution) => checkMergeConditionResolution(target, source, mergeCondition) checkMatchedClausesResolution(target, source, None, None, updateAssigns) checkNotMatchedClausesResolution(target, source, None, insertAssigns) checkNotMatchedBySourceClausesResolution(target, None, None, notMatchedBySourceUpdateAssigns) + assert(withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) } @@ -1727,12 +1735,14 @@ class PlanResolutionSuite extends AnalysisTest { insertAssigns)), Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, StringLiteral("delete")))), UpdateAction(Some(EqualTo(nul: AttributeReference, StringLiteral("update"))), - notMatchedBySourceUpdateAssigns))) => + notMatchedBySourceUpdateAssigns)), + withSchemaEvolution) => checkMergeConditionResolution(target, source, mergeCondition) checkMatchedClausesResolution(target, source, Some(dl), Some(ul), updateAssigns) checkNotMatchedClausesResolution(target, source, Some(il), insertAssigns) checkNotMatchedBySourceClausesResolution(target, Some(ndl), Some(nul), notMatchedBySourceUpdateAssigns) + assert(withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) } @@ -1764,12 +1774,14 @@ class PlanResolutionSuite extends AnalysisTest { insertAssigns)), Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, StringLiteral("delete")))), UpdateAction(Some(EqualTo(nul: AttributeReference, StringLiteral("update"))), - notMatchedBySourceUpdateAssigns))) => + notMatchedBySourceUpdateAssigns)), + withSchemaEvolution) => checkMergeConditionResolution(target, source, mergeCondition) checkMatchedClausesResolution(target, source, Some(dl), Some(ul), updateAssigns) checkNotMatchedClausesResolution(target, source, Some(il), insertAssigns) checkNotMatchedBySourceClausesResolution(target, Some(ndl), Some(nul), notMatchedBySourceUpdateAssigns) + assert(withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) } @@ -1837,6 +1849,7 @@ class PlanResolutionSuite extends AnalysisTest { case other => fail("unexpected second not matched by source action " + other) } + assert(m.withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) @@ -1905,6 +1918,7 @@ class PlanResolutionSuite extends AnalysisTest { Seq(Assignment(_: AttributeReference, Literal(42, IntegerType)))) => case other => fail("unexpected second not matched by source action " + other) } + assert(m.withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) @@ -2009,6 +2023,7 @@ class PlanResolutionSuite extends AnalysisTest { assert(m.matchedActions.length == 2) assert(m.notMatchedActions.length == 1) assert(m.notMatchedBySourceActions.length == 2) + assert(m.withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) @@ -2045,7 +2060,8 @@ class PlanResolutionSuite extends AnalysisTest { Seq(InsertAction( Some(EqualTo(il: AttributeReference, StringLiteral("a"))), insertAssigns)), - Seq(DeleteAction(Some(_)), UpdateAction(None, secondUpdateAssigns))) => + Seq(DeleteAction(Some(_)), UpdateAction(None, secondUpdateAssigns)), + withSchemaEvolution) => val ti = target.output.find(_.name == "i").get val ts = target.output.find(_.name == "s").get val si = source.output.find(_.name == "i").get @@ -2064,6 +2080,7 @@ class PlanResolutionSuite extends AnalysisTest { assert(secondUpdateAssigns.size == 1) // UPDATE key is resolved with target table only, so column `s` is not ambiguous. assert(secondUpdateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts)) + assert(withSchemaEvolution === false) case p => fail("Expect MergeIntoTable, but got:\n" + p.treeString) } @@ -2150,7 +2167,8 @@ class PlanResolutionSuite extends AnalysisTest { _, Seq(), Seq(), - notMatchedBySourceActions) => + notMatchedBySourceActions, + withSchemaEvolution) => assert(notMatchedBySourceActions.length == 2) notMatchedBySourceActions(0) match { case DeleteAction(Some(EqualTo(dl: AttributeReference, StringLiteral("b")))) => @@ -2171,6 +2189,7 @@ class PlanResolutionSuite extends AnalysisTest { assert(us.sameRef(ti)) case other => fail("unexpected second not matched by source action " + other) } + assert(withSchemaEvolution === false) } val sql7 = @@ -2205,6 +2224,7 @@ class PlanResolutionSuite extends AnalysisTest { case u: MergeIntoTable => assert(u.targetTable.isInstanceOf[UnresolvedRelation]) assert(u.sourceTable.isInstanceOf[UnresolvedRelation]) + assert(u.withSchemaEvolution === false) case _ => fail("Expect MergeIntoTable, but got:\n" + parsed.treeString) } @@ -2283,6 +2303,7 @@ class PlanResolutionSuite extends AnalysisTest { assert(s2.functionName == "varcharTypeWriteSideCheck") case other => fail("Expect UpdateAction, but got: " + other) } + assert(m.withSchemaEvolution === false) case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString) } } @@ -2304,12 +2325,14 @@ class PlanResolutionSuite extends AnalysisTest { _, Seq(DeleteAction(None)), Seq(InsertAction(None, insertAssigns)), - Nil) => + Nil, + withSchemaEvolution) => // There is only one assignment, the missing col is not filled with default value assert(insertAssigns.size == 1) // Special case: Spark does not resolve any columns in MERGE if table accepts any schema. assert(insertAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.i") assert(insertAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "DEFAULT") + assert(withSchemaEvolution === false) case l => fail("Expected unresolved MergeIntoTable, but got:\n" + l.treeString) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 51123b17eeec..1c0b4a080ee5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val sessionHandle = client.openSession(user, "") val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS) // scalastyle:off line.size.limit - assert(infoValue.getStringValue == "ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_US [...] + assert(infoValue.getStringValue == "ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_US [...] // scalastyle:on line.size.limit } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org