This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8703b95a33 [spark] Support MergeInto on Paimon append-only table in 
spark V2 write (#7200)
8703b95a33 is described below

commit 8703b95a33e5d597dcb20a0e8e551152f05d2e66
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Feb 5 15:41:01 2026 +0800

    [spark] Support MergeInto on Paimon append-only table in spark V2 write 
(#7200)
---
 .../spark/catalyst/analysis/PaimonMergeInto.scala  | 23 ++++--
 .../spark/catalyst/analysis/PaimonMergeInto.scala  | 24 ++++--
 .../paimon/spark/sql/MergeIntoTableTest.scala      | 40 ++++++++++
 .../analysis/AssignmentAlignmentHelper.scala       | 15 ----
 .../spark/catalyst/analysis/PaimonMergeInto.scala  | 25 ++++--
 .../catalyst/analysis/PaimonMergeIntoBase.scala    | 88 ++++++++--------------
 .../spark/catalyst/analysis/RowLevelHelper.scala   | 11 ++-
 .../spark/rowops/PaimonCopyOnWriteScan.scala       | 38 +++++++---
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  |  8 +-
 9 files changed, 162 insertions(+), 110 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
index b1af5a2d88..95ae7471da 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
@@ -18,20 +18,27 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.commands.MergeIntoPaimonTable
-
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, 
MergeIntoTable}
 
 /** A post-hoc resolution rule for MergeInto. */
 case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
 
-  override def resolveNotMatchedBySourceActions(
-      merge: MergeIntoTable,
-      targetOutput: Seq[AttributeReference],
-      dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
+  /**
+   * Align all MergeActions in a MergeIntoTable based on the target table's 
output attributes.
+   * Returns a new MergeIntoTable with aligned matchedActions and 
notMatchedActions.
+   */
+  override def alignMergeIntoTable(
+      m: MergeIntoTable,
+      targetOutput: Seq[Attribute]): MergeIntoTable = {
+    m.copy(
+      matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
+      notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, 
targetOutput))
+    )
+  }
+
+  override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): 
Seq[MergeAction] = {
     Seq.empty
   }
 }
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
index b1af5a2d88..a92d13cc0b 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
@@ -18,20 +18,28 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.commands.MergeIntoPaimonTable
-
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, 
MergeIntoTable}
 
 /** A post-hoc resolution rule for MergeInto. */
 case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
 
-  override def resolveNotMatchedBySourceActions(
-      merge: MergeIntoTable,
-      targetOutput: Seq[AttributeReference],
-      dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
+  /**
+   * Align all MergeActions in a MergeIntoTable based on the target table's 
output attributes.
+   * Returns a new MergeIntoTable with aligned matchedActions and 
notMatchedActions.
+   */
+  override def alignMergeIntoTable(
+      m: MergeIntoTable,
+      targetOutput: Seq[Attribute]): MergeIntoTable = {
+    m.copy(
+      matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
+      notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, 
targetOutput))
+    )
+  }
+
+  override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): 
Seq[MergeAction] = {
     Seq.empty
   }
+
 }
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index c83ee54938..c8ae09a26b 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -61,3 +61,43 @@ class MergeIntoAppendNonBucketedTableTest
     super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
   }
 }
+
+class V2MergeIntoPrimaryKeyBucketedTableTest
+  extends MergeIntoTableTestBase
+  with MergeIntoPrimaryKeyTableTest
+  with MergeIntoNotMatchedBySourceTest
+  with PaimonPrimaryKeyBucketedTableTest {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
+
+class V2MergeIntoPrimaryKeyNonBucketTableTest
+  extends MergeIntoTableTestBase
+  with MergeIntoPrimaryKeyTableTest
+  with MergeIntoNotMatchedBySourceTest
+  with PaimonPrimaryKeyNonBucketTableTest {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
+
+class V2MergeIntoAppendBucketedTableTest
+  extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
+  with MergeIntoNotMatchedBySourceTest
+  with PaimonAppendBucketedTableTest {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
+
+class V2MergeIntoAppendNonBucketedTableTest
+  extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
+  with MergeIntoNotMatchedBySourceTest
+  with PaimonAppendNonBucketTableTest {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
index 13ce64b86b..2404f1f49f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
@@ -89,21 +89,6 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with 
ExpressionHelper {
     }
   }
 
-  /**
-   * Align all MergeActions in a MergeIntoTable based on the target table's 
output attributes.
-   * Returns a new MergeIntoTable with aligned matchedActions, 
notMatchedActions, and
-   * notMatchedBySourceActions.
-   */
-  protected def alignMergeIntoTable(
-      m: MergeIntoTable,
-      targetOutput: Seq[Attribute]): MergeIntoTable = {
-    m.copy(
-      matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
-      notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, 
targetOutput)),
-      notMatchedBySourceActions = 
m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
-    )
-  }
-
   private def recursiveAlignUpdates(
       targetAttrs: Seq[NamedExpression],
       updates: Seq[AttrUpdate],
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
index d6023c2f69..45916be761 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
@@ -19,17 +19,28 @@
 package org.apache.paimon.spark.catalyst.analysis
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, 
MergeIntoTable}
 
 /** A post-hoc resolution rule for MergeInto. */
 case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
 
-  override def resolveNotMatchedBySourceActions(
-      merge: MergeIntoTable,
-      targetOutput: Seq[AttributeReference],
-      dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
-    merge.notMatchedBySourceActions.map(
-      checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled))
+  /**
+   * Align all MergeActions in a MergeIntoTable based on the target table's 
output attributes.
+   * Returns a new MergeIntoTable with aligned matchedActions, 
notMatchedActions, and
+   * notMatchedBySourceActions.
+   */
+  override def alignMergeIntoTable(
+      m: MergeIntoTable,
+      targetOutput: Seq[Attribute]): MergeIntoTable = {
+    m.copy(
+      matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
+      notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, 
targetOutput)),
+      notMatchedBySourceActions = 
m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
+    )
+  }
+
+  override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): 
Seq[MergeAction] = {
+    merge.notMatchedBySourceActions
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index f9a186e70b..84560543a2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -23,7 +23,7 @@ import 
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, 
MergeIntoPaimonTable}
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 
@@ -62,64 +62,34 @@ trait PaimonMergeIntoBase
             updateActions,
             primaryKeys)
         }
-        val alignedMatchedActions =
-          merge.matchedActions.map(
-            checkAndAlignActionAssignment(_, targetOutput, 
dataEvolutionEnabled))
-        val alignedNotMatchedActions =
-          merge.notMatchedActions.map(
-            checkAndAlignActionAssignment(_, targetOutput, 
dataEvolutionEnabled))
-        val alignedNotMatchedBySourceActions =
-          resolveNotMatchedBySourceActions(merge, targetOutput, 
dataEvolutionEnabled)
-
-        if (dataEvolutionEnabled) {
-          MergeIntoPaimonDataEvolutionTable(
-            v2Table,
-            merge.targetTable,
-            merge.sourceTable,
-            merge.mergeCondition,
-            alignedMatchedActions,
-            alignedNotMatchedActions,
-            alignedNotMatchedBySourceActions
-          )
-        } else {
-          MergeIntoPaimonTable(
-            v2Table,
-            merge.targetTable,
-            merge.sourceTable,
-            merge.mergeCondition,
-            alignedMatchedActions,
-            alignedNotMatchedActions,
-            alignedNotMatchedBySourceActions
-          )
-        }
-    }
-  }
-
-  def resolveNotMatchedBySourceActions(
-      merge: MergeIntoTable,
-      targetOutput: Seq[AttributeReference],
-      dataEvolutionEnabled: Boolean): Seq[MergeAction]
 
-  protected def checkAndAlignActionAssignment(
-      action: MergeAction,
-      targetOutput: Seq[AttributeReference],
-      dataEvolutionEnabled: Boolean): MergeAction = {
-    action match {
-      case d @ DeleteAction(_) => d
-      case u @ UpdateAction(_, assignments) =>
-        u.copy(assignments = alignAssignments(targetOutput, assignments))
+        val alignedMergeIntoTable = alignMergeIntoTable(merge, targetOutput)
 
-      case i @ InsertAction(_, assignments) =>
-        i.copy(assignments = alignAssignments(targetOutput, assignments, 
isInsert = true))
-
-      case _: UpdateStarAction =>
-        throw new RuntimeException(s"UpdateStarAction should not be here.")
-
-      case _: InsertStarAction =>
-        throw new RuntimeException(s"InsertStarAction should not be here.")
-
-      case _ =>
-        throw new RuntimeException(s"Can't recognize this action: $action")
+        if (!shouldFallbackToV1MergeInto(alignedMergeIntoTable)) {
+          alignedMergeIntoTable
+        } else {
+          if (dataEvolutionEnabled) {
+            MergeIntoPaimonDataEvolutionTable(
+              v2Table,
+              merge.targetTable,
+              merge.sourceTable,
+              merge.mergeCondition,
+              alignedMergeIntoTable.matchedActions,
+              alignedMergeIntoTable.notMatchedActions,
+              resolveNotMatchedBySourceActions(alignedMergeIntoTable)
+            )
+          } else {
+            MergeIntoPaimonTable(
+              v2Table,
+              merge.targetTable,
+              merge.sourceTable,
+              merge.mergeCondition,
+              alignedMergeIntoTable.matchedActions,
+              alignedMergeIntoTable.notMatchedActions,
+              resolveNotMatchedBySourceActions(alignedMergeIntoTable)
+            )
+          }
+        }
     }
   }
 
@@ -156,4 +126,8 @@ trait PaimonMergeIntoBase
       throw new RuntimeException("Can't update the primary key column in 
update clause.")
     }
   }
+
+  def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction]
+
+  def alignMergeIntoTable(m: MergeIntoTable, targetOutput: Seq[Attribute]): 
MergeIntoTable
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index 8ec139b607..b3b0929227 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.table.{FileStoreTable, Table}
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, 
MergeIntoTable, UpdateTable}
 
 trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {
 
@@ -102,4 +102,13 @@ trait RowLevelHelper extends SQLConfHelper with 
AssignmentAlignmentHelper {
     !updateTable.rewritable ||
     !updateTable.aligned
   }
+
+  /** Determines if DataSourceV2 merge into is not supported for the given 
table. */
+  protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = {
+    val relation = PaimonRelation.getPaimonRelation(m.targetTable)
+    val table = relation.table.asInstanceOf[SparkTable]
+    shouldFallbackToV1(table) ||
+    !m.rewritable ||
+    !m.aligned
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
index 11bd72e360..9b44094c23 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
@@ -42,6 +42,9 @@ import scala.collection.mutable
 /**
  * Note: The [[pushedPartitionFilters]] and [[pushedDataFilters]] are 
intentionally set to empty
  * because file-level filtering is handled through Spark's runtime V2 
filtering mechanism.
+ *
+ * When Spark's runtime filter is not applied (e.g., when NOT MATCHED BY 
SOURCE is present in
+ * MergeInto), this scan will read all data from the table.
  */
 case class PaimonCopyOnWriteScan(
     table: FileStoreTable,
@@ -51,10 +54,29 @@ case class PaimonCopyOnWriteScan(
   extends BaseScan
   with SupportsRuntimeV2Filtering {
 
-  override def inputSplits: Array[Split] = 
dataSplits.asInstanceOf[Array[Split]]
+  // Track whether filter() has been called
+  @volatile private var filterApplied: Boolean = false
+
+  private val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
+
+  override def inputSplits: Array[Split] = {
+    loadSplits()
+    dataSplits.asInstanceOf[Array[Split]]
+  }
 
   var dataSplits: Array[DataSplit] = Array()
 
+  private def loadSplits(): Unit = {
+    val snapshotReader = table.newSnapshotReader()
+    if (table.coreOptions().manifestDeleteFileDropStats()) {
+      snapshotReader.dropStats()
+    }
+    if (filterApplied) {
+      snapshotReader.withDataFileNameFilter(fileName => 
filteredFileNames.contains(fileName))
+    }
+    dataSplits = snapshotReader.read().splits().asScala.collect { case s: 
DataSplit => s }.toArray
+  }
+
   def scannedFiles: Seq[SparkDataFileMeta] = {
     dataSplits
       .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, 
dataSplit.totalBuckets()))
@@ -66,9 +88,9 @@ case class PaimonCopyOnWriteScan(
   }
 
   override def filter(predicates: Array[SparkPredicate]): Unit = {
-    val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
-    val runtimefilters: Array[Filter] = 
predicates.flatMap(PaimonUtils.filterV2ToV1)
-    for (filter <- runtimefilters) {
+    filterApplied = true
+    val runtimeFilters: Array[Filter] = 
predicates.flatMap(PaimonUtils.filterV2ToV1)
+    for (filter <- runtimeFilters) {
       filter match {
         case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) =>
           for (value <- in.values) {
@@ -78,14 +100,6 @@ case class PaimonCopyOnWriteScan(
         case _ => logWarning("Unsupported runtime filter")
       }
     }
-
-    val snapshotReader = table.newSnapshotReader()
-    if (table.coreOptions().manifestDeleteFileDropStats()) {
-      snapshotReader.dropStats()
-    }
-
-    snapshotReader.withDataFileNameFilter(fileName => 
filteredFileNames.contains(fileName))
-    dataSplits = snapshotReader.read().splits().asScala.collect { case s: 
DataSplit => s }.toArray
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index ee4799f56c..28e1b63fd2 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -528,7 +528,7 @@ abstract class MergeIntoTableTestBase extends 
PaimonSparkTestBase with PaimonTab
       createTable("target", "a INT, b INT, c STRING", Seq("a"))
       spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")
 
-      val error = intercept[RuntimeException] {
+      val error = intercept[Exception] {
         spark.sql(s"""
                      |MERGE INTO target
                      |USING source
@@ -539,7 +539,11 @@ abstract class MergeIntoTableTestBase extends 
PaimonSparkTestBase with PaimonTab
                      |THEN INSERT (a, b, c) values (a, b, c)
                      |""".stripMargin)
       }.getMessage
-      assert(error.contains("match more than one source rows"))
+      // V1 path: "match more than one source rows"
+      // V2 path: "MERGE_CARDINALITY_VIOLATION"
+      assert(
+        error.contains("match more than one source rows") ||
+          error.contains("MERGE_CARDINALITY_VIOLATION"))
     }
   }
 

Reply via email to