This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e6c5d2779 [spark] Data evolution support low version spark (#6135)
9e6c5d2779 is described below

commit 9e6c5d27793147ab760426659a72e1e12f9068c5
Author: YeJunHao <[email protected]>
AuthorDate: Mon Aug 25 11:38:04 2025 +0800

    [spark] Data evolution support low version spark (#6135)
---
 .../catalyst/analysis/PaimonDeleteTable.scala      |   4 +
 .../spark/execution/OldCompatibleStrategy.scala    |  57 +++++
 .../sql/catalyst/plans/logical/MergeRows.scala     | 119 +++++++++++
 .../execution/datasources/v2/MergeRowsExec.scala   | 237 +++++++++++++++++++++
 .../spark/execution/OldCompatibleStrategy.scala    |  57 +++++
 .../sql/catalyst/plans/logical/MergeRows.scala     | 119 +++++++++++
 .../execution/datasources/v2/MergeRowsExec.scala   | 237 +++++++++++++++++++++
 .../spark/execution/OldCompatibleStrategy.scala    |  57 +++++
 .../sql/catalyst/plans/logical/MergeRows.scala     | 119 +++++++++++
 .../execution/datasources/v2/MergeRowsExec.scala   | 237 +++++++++++++++++++++
 .../spark/execution/OldCompatibleStrategy.scala    |  37 ++++
 .../extensions/PaimonSparkSessionExtensions.scala  |   4 +-
 .../paimon/spark/sql/RowLineageTestBase.scala      | 174 +++++++--------
 13 files changed, 1365 insertions(+), 93 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 1d6aa1a8aa..329c4c5886 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -39,6 +39,10 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with 
RowLevelHelper {
         table.getTable match {
           case paimonTable: FileStoreTable =>
             val relation = PaimonRelation.getPaimonRelation(d.table)
+            if (paimonTable.coreOptions().dataEvolutionEnabled()) {
+              throw new RuntimeException(
+                "Delete operation is not supported when data evolution is 
enabled yet.")
+            }
             DeleteFromPaimonTableCommand(relation, paimonTable, 
condition.getOrElse(TrueLiteral))
 
           case _ =>
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
new file mode 100644
index 0000000000..111ceba23e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
+
+case class OldCompatibleStrategy(spark: SparkSession)
+  extends SparkStrategy
+  with PredicateHelper
+  with PaimonLookupCatalog {
+
+  protected lazy val catalogManager = spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case MergeRows(
+          isSourceRowPresent,
+          isTargetRowPresent,
+          matchedInstructions,
+          notMatchedInstructions,
+          notMatchedBySourceInstructions,
+          checkCardinality,
+          output,
+          child) =>
+      MergeRowsExec(
+        isSourceRowPresent,
+        isTargetRowPresent,
+        matchedInstructions,
+        notMatchedInstructions,
+        notMatchedBySourceInstructions,
+        checkCardinality,
+        output,
+        planLater(child)
+      ) :: Nil
+    case _ => Nil
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
new file mode 100644
index 0000000000..a6f8b8f134
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, 
ROW_ID}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.types.{DataType, NullType}
+
+case class MergeRows(
+    isSourceRowPresent: Expression,
+    isTargetRowPresent: Expression,
+    matchedInstructions: Seq[Instruction],
+    notMatchedInstructions: Seq[Instruction],
+    notMatchedBySourceInstructions: Seq[Instruction],
+    checkCardinality: Boolean,
+    output: Seq[Attribute],
+    child: LogicalPlan)
+  extends UnaryNode {
+
+  override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  @transient
+  override lazy val references: AttributeSet = {
+    val usedExprs = if (checkCardinality) {
+      val rowIdAttr = child.output.find(attr => conf.resolver(attr.name, 
ROW_ID))
+      assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+      rowIdAttr.get +: expressions
+    } else {
+      expressions
+    }
+    AttributeSet.fromAttributeSets(usedExprs.map(_.references)) -- 
producedAttributes
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = {
+    copy(child = newChild)
+  }
+}
+
+object MergeRows {
+  final val ROW_ID = "__row_id"
+
+  /**
+   * When a MERGE operation is rewritten, the target table is joined with the 
source and each
+   * MATCHED/NOT MATCHED/NOT MATCHED BY SOURCE clause is converted into a 
corresponding instruction
+   * on top of the joined plan. The purpose of an instruction is to derive an 
output row based on a
+   * joined row.
+   *
+   * Instructions are valid expressions so that they will be properly 
transformed by the analyzer
+   * and optimizer.
+   */
+  sealed trait Instruction extends Expression with Unevaluable {
+    def condition: Expression
+    def outputs: Seq[Seq[Expression]]
+    override def nullable: Boolean = false
+    // We return NullType here as only the `MergeRows` operator can contain 
`Instruction`
+    // expressions and it doesn't care about the data type. Some external 
optimizer rules may
+    // assume optimized plan is always resolved and Expression#dataType is 
always available, so
+    // we can't just fail here.
+    override def dataType: DataType = NullType
+  }
+
+  case class Keep(condition: Expression, output: Seq[Expression]) extends 
Instruction {
+    def children: Seq[Expression] = condition +: output
+    override def outputs: Seq[Seq[Expression]] = Seq(output)
+
+    override protected def withNewChildrenInternal(
+        newChildren: IndexedSeq[Expression]): Expression = {
+      copy(condition = newChildren.head, output = newChildren.tail)
+    }
+  }
+
+  case class Discard(condition: Expression) extends Instruction with 
UnaryLike[Expression] {
+    override def outputs: Seq[Seq[Expression]] = Seq.empty
+    override def child: Expression = condition
+
+    override protected def withNewChildInternal(newChild: Expression): 
Expression = {
+      copy(condition = newChild)
+    }
+  }
+
+  case class Split(condition: Expression, output: Seq[Expression], 
otherOutput: Seq[Expression])
+    extends Instruction {
+
+    def children: Seq[Expression] = Seq(condition) ++ output ++ otherOutput
+    override def outputs: Seq[Seq[Expression]] = Seq(output, otherOutput)
+
+    override protected def withNewChildrenInternal(
+        newChildren: IndexedSeq[Expression]): Expression = {
+      val newCondition = newChildren.head
+      val newOutput = newChildren.slice(from = 1, until = output.size + 1)
+      val newOtherOutput = newChildren.takeRight(otherOutput.size)
+      copy(condition = newCondition, output = newOutput, otherOutput = 
newOtherOutput)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
new file mode 100644
index 0000000000..b830b6b6fe
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
BasePredicate, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows._
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.roaringbitmap.longlong.Roaring64Bitmap
+
+case class MergeRowsExec(
+    isSourceRowPresent: Expression,
+    isTargetRowPresent: Expression,
+    matchedInstructions: Seq[Instruction],
+    notMatchedInstructions: Seq[Instruction],
+    notMatchedBySourceInstructions: Seq[Instruction],
+    checkCardinality: Boolean,
+    output: Seq[Attribute],
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  @transient
+  override lazy val references: AttributeSet = {
+    val usedExprs = if (checkCardinality) {
+      val rowIdAttr = child.output.find(attr => conf.resolver(attr.name, 
ROW_ID))
+      assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+      rowIdAttr.get +: expressions
+    } else {
+      expressions
+    }
+    AttributeSet.fromAttributeSets(usedExprs.map(_.references)) -- 
producedAttributes
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan 
= {
+    copy(child = newChild)
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions(processPartition)
+  }
+
+  private def processPartition(rowIterator: Iterator[InternalRow]): 
Iterator[InternalRow] = {
+    val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+    val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
+
+    val matchedInstructionExecs = planInstructions(matchedInstructions)
+    val notMatchedInstructionExecs = planInstructions(notMatchedInstructions)
+    val notMatchedBySourceInstructionExecs = 
planInstructions(notMatchedBySourceInstructions)
+
+    val cardinalityValidator = if (checkCardinality) {
+      val rowIdOrdinal = child.output.indexWhere(attr => 
conf.resolver(attr.name, ROW_ID))
+      assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+      BitmapCardinalityValidator(rowIdOrdinal)
+    } else {
+      NoopCardinalityValidator
+    }
+
+    val mergeIterator = new MergeRowIterator(
+      rowIterator,
+      cardinalityValidator,
+      isTargetRowPresentPred,
+      isSourceRowPresentPred,
+      matchedInstructionExecs,
+      notMatchedInstructionExecs,
+      notMatchedBySourceInstructionExecs)
+
+    // null indicates a record must be discarded
+    mergeIterator.filter(_ != null)
+  }
+
+  private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+    UnsafeProjection.create(exprs, child.output)
+  }
+
+  private def createPredicate(expr: Expression): BasePredicate = {
+    GeneratePredicate.generate(expr, child.output)
+  }
+
+  private def planInstructions(instructions: Seq[Instruction]): 
Seq[InstructionExec] = {
+    instructions.map {
+      case Keep(cond, output) =>
+        KeepExec(createPredicate(cond), createProjection(output))
+
+      case Discard(cond) =>
+        DiscardExec(createPredicate(cond))
+
+      case Split(cond, output, otherOutput) =>
+        SplitExec(createPredicate(cond), createProjection(output), 
createProjection(otherOutput))
+
+      case other =>
+        throw new RuntimeException("Unsupported instruction type: " + 
other.getClass.getSimpleName)
+    }
+  }
+
+  sealed trait InstructionExec {
+    def condition: BasePredicate
+  }
+
+  case class KeepExec(condition: BasePredicate, projection: Projection) 
extends InstructionExec {
+    def apply(row: InternalRow): InternalRow = projection.apply(row)
+  }
+
+  case class DiscardExec(condition: BasePredicate) extends InstructionExec
+
+  case class SplitExec(
+      condition: BasePredicate,
+      projection: Projection,
+      otherProjection: Projection)
+    extends InstructionExec {
+    def projectRow(row: InternalRow): InternalRow = projection.apply(row)
+
+    def projectExtraRow(row: InternalRow): InternalRow = 
otherProjection.apply(row)
+  }
+
+  sealed trait CardinalityValidator {
+    def validate(row: InternalRow): Unit
+  }
+
+  object NoopCardinalityValidator extends CardinalityValidator {
+    def validate(row: InternalRow): Unit = {}
+  }
+
+  /**
+   * A simple cardinality validator that keeps track of seen row IDs in a 
roaring bitmap. This
+   * validator assumes the target table is never broadcasted or replicated, 
which guarantees matches
+   * for one target row are always co-located in the same partition.
+   *
+   * IDs are generated by 
[[org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID]].
+   */
+  case class BitmapCardinalityValidator(rowIdOrdinal: Int) extends 
CardinalityValidator {
+    // use Roaring64Bitmap as row IDs generated by MonotonicallyIncreasingID 
are 64-bit integers
+    private val matchedRowIds = new Roaring64Bitmap()
+
+    override def validate(row: InternalRow): Unit = {
+      val currentRowId = row.getLong(rowIdOrdinal)
+      if (matchedRowIds.contains(currentRowId)) {
+        throw new RuntimeException("Should not happens")
+      }
+      matchedRowIds.add(currentRowId)
+    }
+  }
+
+  /**
+   * An iterator that acts on joined target and source rows and computes 
deletes, updates and
+   * inserts according to provided MERGE instructions.
+   *
+   * If a particular joined row should be discarded, this iterator returns 
null.
+   */
+  class MergeRowIterator(
+      private val rowIterator: Iterator[InternalRow],
+      private val cardinalityValidator: CardinalityValidator,
+      private val isTargetRowPresentPred: BasePredicate,
+      private val isSourceRowPresentPred: BasePredicate,
+      private val matchedInstructions: Seq[InstructionExec],
+      private val notMatchedInstructions: Seq[InstructionExec],
+      private val notMatchedBySourceInstructions: Seq[InstructionExec])
+    extends Iterator[InternalRow] {
+
+    var cachedExtraRow: InternalRow = _
+
+    override def hasNext: Boolean = cachedExtraRow != null || 
rowIterator.hasNext
+
+    override def next(): InternalRow = {
+      if (cachedExtraRow != null) {
+        val extraRow = cachedExtraRow
+        cachedExtraRow = null
+        return extraRow
+      }
+
+      val row = rowIterator.next()
+
+      val isSourceRowPresent = isSourceRowPresentPred.eval(row)
+      val isTargetRowPresent = isTargetRowPresentPred.eval(row)
+
+      if (isTargetRowPresent && isSourceRowPresent) {
+        cardinalityValidator.validate(row)
+        applyInstructions(row, matchedInstructions)
+      } else if (isSourceRowPresent) {
+        applyInstructions(row, notMatchedInstructions)
+      } else if (isTargetRowPresent) {
+        applyInstructions(row, notMatchedBySourceInstructions)
+      } else {
+        null
+      }
+    }
+
+    private def applyInstructions(
+        row: InternalRow,
+        instructions: Seq[InstructionExec]): InternalRow = {
+
+      for (instruction <- instructions) {
+        if (instruction.condition.eval(row)) {
+          instruction match {
+            case keep: KeepExec =>
+              return keep.apply(row)
+
+            case _: DiscardExec =>
+              return null
+
+            case split: SplitExec =>
+              cachedExtraRow = split.projectExtraRow(row)
+              return split.projectRow(row)
+          }
+        }
+      }
+
+      null
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
new file mode 100644
index 0000000000..111ceba23e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
+
+case class OldCompatibleStrategy(spark: SparkSession)
+  extends SparkStrategy
+  with PredicateHelper
+  with PaimonLookupCatalog {
+
+  protected lazy val catalogManager = spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case MergeRows(
+          isSourceRowPresent,
+          isTargetRowPresent,
+          matchedInstructions,
+          notMatchedInstructions,
+          notMatchedBySourceInstructions,
+          checkCardinality,
+          output,
+          child) =>
+      MergeRowsExec(
+        isSourceRowPresent,
+        isTargetRowPresent,
+        matchedInstructions,
+        notMatchedInstructions,
+        notMatchedBySourceInstructions,
+        checkCardinality,
+        output,
+        planLater(child)
+      ) :: Nil
+    case _ => Nil
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
new file mode 100644
index 0000000000..a6f8b8f134
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, 
ROW_ID}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.types.{DataType, NullType}
+
+case class MergeRows(
+    isSourceRowPresent: Expression,
+    isTargetRowPresent: Expression,
+    matchedInstructions: Seq[Instruction],
+    notMatchedInstructions: Seq[Instruction],
+    notMatchedBySourceInstructions: Seq[Instruction],
+    checkCardinality: Boolean,
+    output: Seq[Attribute],
+    child: LogicalPlan)
+  extends UnaryNode {
+
+  override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  @transient
+  override lazy val references: AttributeSet = {
+    val usedExprs = if (checkCardinality) {
+      val rowIdAttr = child.output.find(attr => conf.resolver(attr.name, 
ROW_ID))
+      assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+      rowIdAttr.get +: expressions
+    } else {
+      expressions
+    }
+    AttributeSet.fromAttributeSets(usedExprs.map(_.references)) -- 
producedAttributes
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = {
+    copy(child = newChild)
+  }
+}
+
+object MergeRows {
+  final val ROW_ID = "__row_id"
+
+  /**
+   * When a MERGE operation is rewritten, the target table is joined with the 
source and each
+   * MATCHED/NOT MATCHED/NOT MATCHED BY SOURCE clause is converted into a 
corresponding instruction
+   * on top of the joined plan. The purpose of an instruction is to derive an 
output row based on a
+   * joined row.
+   *
+   * Instructions are valid expressions so that they will be properly 
transformed by the analyzer
+   * and optimizer.
+   */
+  sealed trait Instruction extends Expression with Unevaluable {
+    def condition: Expression
+    def outputs: Seq[Seq[Expression]]
+    override def nullable: Boolean = false
+    // We return NullType here as only the `MergeRows` operator can contain 
`Instruction`
+    // expressions and it doesn't care about the data type. Some external 
optimizer rules may
+    // assume optimized plan is always resolved and Expression#dataType is 
always available, so
+    // we can't just fail here.
+    override def dataType: DataType = NullType
+  }
+
+  case class Keep(condition: Expression, output: Seq[Expression]) extends 
Instruction {
+    def children: Seq[Expression] = condition +: output
+    override def outputs: Seq[Seq[Expression]] = Seq(output)
+
+    override protected def withNewChildrenInternal(
+        newChildren: IndexedSeq[Expression]): Expression = {
+      copy(condition = newChildren.head, output = newChildren.tail)
+    }
+  }
+
+  case class Discard(condition: Expression) extends Instruction with 
UnaryLike[Expression] {
+    override def outputs: Seq[Seq[Expression]] = Seq.empty
+    override def child: Expression = condition
+
+    override protected def withNewChildInternal(newChild: Expression): 
Expression = {
+      copy(condition = newChild)
+    }
+  }
+
+  case class Split(condition: Expression, output: Seq[Expression], 
otherOutput: Seq[Expression])
+    extends Instruction {
+
+    def children: Seq[Expression] = Seq(condition) ++ output ++ otherOutput
+    override def outputs: Seq[Seq[Expression]] = Seq(output, otherOutput)
+
+    override protected def withNewChildrenInternal(
+        newChildren: IndexedSeq[Expression]): Expression = {
+      val newCondition = newChildren.head
+      val newOutput = newChildren.slice(from = 1, until = output.size + 1)
+      val newOtherOutput = newChildren.takeRight(otherOutput.size)
+      copy(condition = newCondition, output = newOutput, otherOutput = 
newOtherOutput)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
new file mode 100644
index 0000000000..b830b6b6fe
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
BasePredicate, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows._
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.roaringbitmap.longlong.Roaring64Bitmap
+
+case class MergeRowsExec(
+    isSourceRowPresent: Expression,
+    isTargetRowPresent: Expression,
+    matchedInstructions: Seq[Instruction],
+    notMatchedInstructions: Seq[Instruction],
+    notMatchedBySourceInstructions: Seq[Instruction],
+    checkCardinality: Boolean,
+    output: Seq[Attribute],
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  @transient
+  override lazy val references: AttributeSet = {
+    val usedExprs = if (checkCardinality) {
+      val rowIdAttr = child.output.find(attr => conf.resolver(attr.name, 
ROW_ID))
+      assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+      rowIdAttr.get +: expressions
+    } else {
+      expressions
+    }
+    AttributeSet.fromAttributeSets(usedExprs.map(_.references)) -- 
producedAttributes
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan 
= {
+    copy(child = newChild)
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions(processPartition)
+  }
+
+  private def processPartition(rowIterator: Iterator[InternalRow]): 
Iterator[InternalRow] = {
+    val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+    val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
+
+    val matchedInstructionExecs = planInstructions(matchedInstructions)
+    val notMatchedInstructionExecs = planInstructions(notMatchedInstructions)
+    val notMatchedBySourceInstructionExecs = 
planInstructions(notMatchedBySourceInstructions)
+
+    val cardinalityValidator = if (checkCardinality) {
+      val rowIdOrdinal = child.output.indexWhere(attr => 
conf.resolver(attr.name, ROW_ID))
+      assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+      BitmapCardinalityValidator(rowIdOrdinal)
+    } else {
+      NoopCardinalityValidator
+    }
+
+    val mergeIterator = new MergeRowIterator(
+      rowIterator,
+      cardinalityValidator,
+      isTargetRowPresentPred,
+      isSourceRowPresentPred,
+      matchedInstructionExecs,
+      notMatchedInstructionExecs,
+      notMatchedBySourceInstructionExecs)
+
+    // null indicates a record must be discarded
+    mergeIterator.filter(_ != null)
+  }
+
+  private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+    UnsafeProjection.create(exprs, child.output)
+  }
+
+  private def createPredicate(expr: Expression): BasePredicate = {
+    GeneratePredicate.generate(expr, child.output)
+  }
+
+  private def planInstructions(instructions: Seq[Instruction]): 
Seq[InstructionExec] = {
+    instructions.map {
+      case Keep(cond, output) =>
+        KeepExec(createPredicate(cond), createProjection(output))
+
+      case Discard(cond) =>
+        DiscardExec(createPredicate(cond))
+
+      case Split(cond, output, otherOutput) =>
+        SplitExec(createPredicate(cond), createProjection(output), 
createProjection(otherOutput))
+
+      case other =>
+        throw new RuntimeException("Unsupported instruction type: " + 
other.getClass.getSimpleName)
+    }
+  }
+
+  sealed trait InstructionExec {
+    def condition: BasePredicate
+  }
+
+  case class KeepExec(condition: BasePredicate, projection: Projection) 
extends InstructionExec {
+    def apply(row: InternalRow): InternalRow = projection.apply(row)
+  }
+
+  case class DiscardExec(condition: BasePredicate) extends InstructionExec
+
+  case class SplitExec(
+      condition: BasePredicate,
+      projection: Projection,
+      otherProjection: Projection)
+    extends InstructionExec {
+    def projectRow(row: InternalRow): InternalRow = projection.apply(row)
+
+    def projectExtraRow(row: InternalRow): InternalRow = 
otherProjection.apply(row)
+  }
+
+  sealed trait CardinalityValidator {
+    def validate(row: InternalRow): Unit
+  }
+
+  object NoopCardinalityValidator extends CardinalityValidator {
+    def validate(row: InternalRow): Unit = {}
+  }
+
+  /**
+   * A simple cardinality validator that keeps track of seen row IDs in a 
roaring bitmap. This
+   * validator assumes the target table is never broadcasted or replicated, 
which guarantees matches
+   * for one target row are always co-located in the same partition.
+   *
+   * IDs are generated by 
[[org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID]].
+   */
+  case class BitmapCardinalityValidator(rowIdOrdinal: Int) extends 
CardinalityValidator {
+    // use Roaring64Bitmap as row IDs generated by MonotonicallyIncreasingID 
are 64-bit integers
+    private val matchedRowIds = new Roaring64Bitmap()
+
+    override def validate(row: InternalRow): Unit = {
+      val currentRowId = row.getLong(rowIdOrdinal)
+      if (matchedRowIds.contains(currentRowId)) {
+        throw new RuntimeException("Should not happens")
+      }
+      matchedRowIds.add(currentRowId)
+    }
+  }
+
+  /**
+   * An iterator that acts on joined target and source rows and computes 
deletes, updates and
+   * inserts according to provided MERGE instructions.
+   *
+   * If a particular joined row should be discarded, this iterator returns 
null.
+   */
+  class MergeRowIterator(
+      private val rowIterator: Iterator[InternalRow],
+      private val cardinalityValidator: CardinalityValidator,
+      private val isTargetRowPresentPred: BasePredicate,
+      private val isSourceRowPresentPred: BasePredicate,
+      private val matchedInstructions: Seq[InstructionExec],
+      private val notMatchedInstructions: Seq[InstructionExec],
+      private val notMatchedBySourceInstructions: Seq[InstructionExec])
+    extends Iterator[InternalRow] {
+
+    var cachedExtraRow: InternalRow = _
+
+    override def hasNext: Boolean = cachedExtraRow != null || 
rowIterator.hasNext
+
+    override def next(): InternalRow = {
+      if (cachedExtraRow != null) {
+        val extraRow = cachedExtraRow
+        cachedExtraRow = null
+        return extraRow
+      }
+
+      val row = rowIterator.next()
+
+      val isSourceRowPresent = isSourceRowPresentPred.eval(row)
+      val isTargetRowPresent = isTargetRowPresentPred.eval(row)
+
+      if (isTargetRowPresent && isSourceRowPresent) {
+        cardinalityValidator.validate(row)
+        applyInstructions(row, matchedInstructions)
+      } else if (isSourceRowPresent) {
+        applyInstructions(row, notMatchedInstructions)
+      } else if (isTargetRowPresent) {
+        applyInstructions(row, notMatchedBySourceInstructions)
+      } else {
+        null
+      }
+    }
+
+    private def applyInstructions(
+        row: InternalRow,
+        instructions: Seq[InstructionExec]): InternalRow = {
+
+      for (instruction <- instructions) {
+        if (instruction.condition.eval(row)) {
+          instruction match {
+            case keep: KeepExec =>
+              return keep.apply(row)
+
+            case _: DiscardExec =>
+              return null
+
+            case split: SplitExec =>
+              cachedExtraRow = split.projectExtraRow(row)
+              return split.projectRow(row)
+          }
+        }
+      }
+
+      null
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
new file mode 100644
index 0000000000..111ceba23e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
+
+case class OldCompatibleStrategy(spark: SparkSession)
+  extends SparkStrategy
+  with PredicateHelper
+  with PaimonLookupCatalog {
+
+  protected lazy val catalogManager = spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case MergeRows(
+          isSourceRowPresent,
+          isTargetRowPresent,
+          matchedInstructions,
+          notMatchedInstructions,
+          notMatchedBySourceInstructions,
+          checkCardinality,
+          output,
+          child) =>
+      MergeRowsExec(
+        isSourceRowPresent,
+        isTargetRowPresent,
+        matchedInstructions,
+        notMatchedInstructions,
+        notMatchedBySourceInstructions,
+        checkCardinality,
+        output,
+        planLater(child)
+      ) :: Nil
+    case _ => Nil
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
new file mode 100644
index 0000000000..a6f8b8f134
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, 
ROW_ID}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.types.{DataType, NullType}
+
+case class MergeRows(
+    isSourceRowPresent: Expression,
+    isTargetRowPresent: Expression,
+    matchedInstructions: Seq[Instruction],
+    notMatchedInstructions: Seq[Instruction],
+    notMatchedBySourceInstructions: Seq[Instruction],
+    checkCardinality: Boolean,
+    output: Seq[Attribute],
+    child: LogicalPlan)
+  extends UnaryNode {
+
+  override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  @transient
+  override lazy val references: AttributeSet = {
+    val usedExprs = if (checkCardinality) {
+      val rowIdAttr = child.output.find(attr => conf.resolver(attr.name, 
ROW_ID))
+      assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+      rowIdAttr.get +: expressions
+    } else {
+      expressions
+    }
+    AttributeSet.fromAttributeSets(usedExprs.map(_.references)) -- 
producedAttributes
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = {
+    copy(child = newChild)
+  }
+}
+
+object MergeRows {
+  final val ROW_ID = "__row_id"
+
+  /**
+   * When a MERGE operation is rewritten, the target table is joined with the 
source and each
+   * MATCHED/NOT MATCHED/NOT MATCHED BY SOURCE clause is converted into a 
corresponding instruction
+   * on top of the joined plan. The purpose of an instruction is to derive an 
output row based on a
+   * joined row.
+   *
+   * Instructions are valid expressions so that they will be properly 
transformed by the analyzer
+   * and optimizer.
+   */
+  sealed trait Instruction extends Expression with Unevaluable {
+    def condition: Expression
+    def outputs: Seq[Seq[Expression]]
+    override def nullable: Boolean = false
+    // We return NullType here as only the `MergeRows` operator can contain 
`Instruction`
+    // expressions and it doesn't care about the data type. Some external 
optimizer rules may
+    // assume optimized plan is always resolved and Expression#dataType is 
always available, so
+    // we can't just fail here.
+    override def dataType: DataType = NullType
+  }
+
+  case class Keep(condition: Expression, output: Seq[Expression]) extends 
Instruction {
+    def children: Seq[Expression] = condition +: output
+    override def outputs: Seq[Seq[Expression]] = Seq(output)
+
+    override protected def withNewChildrenInternal(
+        newChildren: IndexedSeq[Expression]): Expression = {
+      copy(condition = newChildren.head, output = newChildren.tail)
+    }
+  }
+
+  case class Discard(condition: Expression) extends Instruction with 
UnaryLike[Expression] {
+    override def outputs: Seq[Seq[Expression]] = Seq.empty
+    override def child: Expression = condition
+
+    override protected def withNewChildInternal(newChild: Expression): 
Expression = {
+      copy(condition = newChild)
+    }
+  }
+
+  case class Split(condition: Expression, output: Seq[Expression], 
otherOutput: Seq[Expression])
+    extends Instruction {
+
+    def children: Seq[Expression] = Seq(condition) ++ output ++ otherOutput
+    override def outputs: Seq[Seq[Expression]] = Seq(output, otherOutput)
+
+    override protected def withNewChildrenInternal(
+        newChildren: IndexedSeq[Expression]): Expression = {
+      val newCondition = newChildren.head
+      val newOutput = newChildren.slice(from = 1, until = output.size + 1)
+      val newOtherOutput = newChildren.takeRight(otherOutput.size)
+      copy(condition = newCondition, output = newOutput, otherOutput = 
newOtherOutput)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
new file mode 100644
index 0000000000..b830b6b6fe
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
BasePredicate, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows._
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.roaringbitmap.longlong.Roaring64Bitmap
+
+case class MergeRowsExec(
+    isSourceRowPresent: Expression,
+    isTargetRowPresent: Expression,
+    matchedInstructions: Seq[Instruction],
+    notMatchedInstructions: Seq[Instruction],
+    notMatchedBySourceInstructions: Seq[Instruction],
+    checkCardinality: Boolean,
+    output: Seq[Attribute],
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  @transient
+  override lazy val references: AttributeSet = {
+    val usedExprs = if (checkCardinality) {
+      val rowIdAttr = child.output.find(attr => conf.resolver(attr.name, 
ROW_ID))
+      assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+      rowIdAttr.get +: expressions
+    } else {
+      expressions
+    }
+    AttributeSet.fromAttributeSets(usedExprs.map(_.references)) -- 
producedAttributes
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan 
= {
+    copy(child = newChild)
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions(processPartition)
+  }
+
+  private def processPartition(rowIterator: Iterator[InternalRow]): 
Iterator[InternalRow] = {
+    val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+    val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
+
+    val matchedInstructionExecs = planInstructions(matchedInstructions)
+    val notMatchedInstructionExecs = planInstructions(notMatchedInstructions)
+    val notMatchedBySourceInstructionExecs = 
planInstructions(notMatchedBySourceInstructions)
+
+    val cardinalityValidator = if (checkCardinality) {
+      val rowIdOrdinal = child.output.indexWhere(attr => 
conf.resolver(attr.name, ROW_ID))
+      assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+      BitmapCardinalityValidator(rowIdOrdinal)
+    } else {
+      NoopCardinalityValidator
+    }
+
+    val mergeIterator = new MergeRowIterator(
+      rowIterator,
+      cardinalityValidator,
+      isTargetRowPresentPred,
+      isSourceRowPresentPred,
+      matchedInstructionExecs,
+      notMatchedInstructionExecs,
+      notMatchedBySourceInstructionExecs)
+
+    // null indicates a record must be discarded
+    mergeIterator.filter(_ != null)
+  }
+
+  private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+    UnsafeProjection.create(exprs, child.output)
+  }
+
+  private def createPredicate(expr: Expression): BasePredicate = {
+    GeneratePredicate.generate(expr, child.output)
+  }
+
+  private def planInstructions(instructions: Seq[Instruction]): 
Seq[InstructionExec] = {
+    instructions.map {
+      case Keep(cond, output) =>
+        KeepExec(createPredicate(cond), createProjection(output))
+
+      case Discard(cond) =>
+        DiscardExec(createPredicate(cond))
+
+      case Split(cond, output, otherOutput) =>
+        SplitExec(createPredicate(cond), createProjection(output), 
createProjection(otherOutput))
+
+      case other =>
+        throw new RuntimeException("Unsupported instruction type: " + 
other.getClass.getSimpleName)
+    }
+  }
+
+  sealed trait InstructionExec {
+    def condition: BasePredicate
+  }
+
+  case class KeepExec(condition: BasePredicate, projection: Projection) 
extends InstructionExec {
+    def apply(row: InternalRow): InternalRow = projection.apply(row)
+  }
+
+  case class DiscardExec(condition: BasePredicate) extends InstructionExec
+
+  case class SplitExec(
+      condition: BasePredicate,
+      projection: Projection,
+      otherProjection: Projection)
+    extends InstructionExec {
+    def projectRow(row: InternalRow): InternalRow = projection.apply(row)
+
+    def projectExtraRow(row: InternalRow): InternalRow = 
otherProjection.apply(row)
+  }
+
+  sealed trait CardinalityValidator {
+    def validate(row: InternalRow): Unit
+  }
+
+  object NoopCardinalityValidator extends CardinalityValidator {
+    def validate(row: InternalRow): Unit = {}
+  }
+
+  /**
+   * A simple cardinality validator that keeps track of seen row IDs in a 
roaring bitmap. This
+   * validator assumes the target table is never broadcasted or replicated, 
which guarantees matches
+   * for one target row are always co-located in the same partition.
+   *
+   * IDs are generated by 
[[org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID]].
+   */
+  case class BitmapCardinalityValidator(rowIdOrdinal: Int) extends 
CardinalityValidator {
+    // use Roaring64Bitmap as row IDs generated by MonotonicallyIncreasingID 
are 64-bit integers
+    private val matchedRowIds = new Roaring64Bitmap()
+
+    override def validate(row: InternalRow): Unit = {
+      val currentRowId = row.getLong(rowIdOrdinal)
+      if (matchedRowIds.contains(currentRowId)) {
+        throw new RuntimeException("Should not happens")
+      }
+      matchedRowIds.add(currentRowId)
+    }
+  }
+
+  /**
+   * An iterator that acts on joined target and source rows and computes 
deletes, updates and
+   * inserts according to provided MERGE instructions.
+   *
+   * If a particular joined row should be discarded, this iterator returns 
null.
+   */
+  class MergeRowIterator(
+      private val rowIterator: Iterator[InternalRow],
+      private val cardinalityValidator: CardinalityValidator,
+      private val isTargetRowPresentPred: BasePredicate,
+      private val isSourceRowPresentPred: BasePredicate,
+      private val matchedInstructions: Seq[InstructionExec],
+      private val notMatchedInstructions: Seq[InstructionExec],
+      private val notMatchedBySourceInstructions: Seq[InstructionExec])
+    extends Iterator[InternalRow] {
+
+    var cachedExtraRow: InternalRow = _
+
+    override def hasNext: Boolean = cachedExtraRow != null || 
rowIterator.hasNext
+
+    override def next(): InternalRow = {
+      if (cachedExtraRow != null) {
+        val extraRow = cachedExtraRow
+        cachedExtraRow = null
+        return extraRow
+      }
+
+      val row = rowIterator.next()
+
+      val isSourceRowPresent = isSourceRowPresentPred.eval(row)
+      val isTargetRowPresent = isTargetRowPresentPred.eval(row)
+
+      if (isTargetRowPresent && isSourceRowPresent) {
+        cardinalityValidator.validate(row)
+        applyInstructions(row, matchedInstructions)
+      } else if (isSourceRowPresent) {
+        applyInstructions(row, notMatchedInstructions)
+      } else if (isTargetRowPresent) {
+        applyInstructions(row, notMatchedBySourceInstructions)
+      } else {
+        null
+      }
+    }
+
+    private def applyInstructions(
+        row: InternalRow,
+        instructions: Seq[InstructionExec]): InternalRow = {
+
+      for (instruction <- instructions) {
+        if (instruction.condition.eval(row)) {
+          instruction match {
+            case keep: KeepExec =>
+              return keep.apply(row)
+
+            case _: DiscardExec =>
+              return null
+
+            case split: SplitExec =>
+              cachedExtraRow = split.projectExtraRow(row)
+              return split.projectRow(row)
+          }
+        }
+      }
+
+      null
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
new file mode 100644
index 0000000000..7f6fcc6c63
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+
+case class OldCompatibleStrategy(spark: SparkSession)
+  extends SparkStrategy
+  with PredicateHelper
+  with PaimonLookupCatalog {
+
+  protected lazy val catalogManager = spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case _ => Nil
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 8e363c5026..b6e29b8a77 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -22,7 +22,7 @@ import 
org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTa
 import 
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, 
MergePaimonScalarSubqueries}
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
 import org.apache.paimon.spark.commands.BucketExpression
-import org.apache.paimon.spark.execution.PaimonStrategy
+import org.apache.paimon.spark.execution.{OldCompatibleStrategy, 
PaimonStrategy}
 import 
org.apache.paimon.spark.execution.adaptive.DisableUnnecessaryPaimonBucketedScan
 
 import org.apache.spark.sql.SparkSessionExtensions
@@ -70,6 +70,8 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
 
     // planner extensions
     extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
+    // old compatible
+    extensions.injectPlannerStrategy(spark => OldCompatibleStrategy(spark))
 
     // query stage preparation
     extensions.injectQueryStagePrepRule(_ => 
DisableUnnecessaryPaimonBucketedScan)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index 043adef310..da43a2e91b 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -199,119 +199,109 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
   }
 
   test("Data Evolution: insert into table with data-evolution") {
-    if (gteqSpark3_5) {
-      withTable("s", "t") {
-        sql("CREATE TABLE s (id INT, b INT)")
-        sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+    withTable("s", "t") {
+      sql("CREATE TABLE s (id INT, b INT)")
+      sql("INSERT INTO s VALUES (1, 11), (2, 22)")
 
-        sql(
-          "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
-        sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
+      sql(
+        "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
 
-        sql("""
-              |MERGE INTO t
-              |USING s
-              |ON t.id = s.id
-              |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
-              |""".stripMargin)
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
+            |""".stripMargin)
 
-        checkAnswer(
-          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-          Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(3, 3, 3, 1, 1))
-        )
-      }
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(3, 3, 3, 1, 1))
+      )
     }
   }
 
   test("Data Evolution: merge into table with data-evolution") {
-    if (gteqSpark3_5) {
-      withTable("s", "t") {
-        sql("CREATE TABLE s (id INT, b INT)")
-        sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+    withTable("s", "t") {
+      sql("CREATE TABLE s (id INT, b INT)")
+      sql("INSERT INTO s VALUES (1, 11), (2, 22)")
 
-        sql(
-          "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
-        sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
-
-        sql("""
-              |MERGE INTO t
-              |USING s
-              |ON t.id = s.id
-              |WHEN MATCHED THEN UPDATE SET t.b = s.b
-              |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
-              |""".stripMargin)
-        checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3)))
-        checkAnswer(
-          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-          Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
-        )
-      }
+      sql(
+        "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN MATCHED THEN UPDATE SET t.b = s.b
+            |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
+            |""".stripMargin)
+      checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3)))
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
+      )
     }
   }
 
   test("Data Evolution: merge into table with data-evolution complex") {
-    if (gteqSpark3_5) {
-      withTable("source", "target") {
-        sql("CREATE TABLE source (a INT, b INT, c STRING)")
-        sql(
-          "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 
500, 'c55'), (7, 700, 'c77'), (9, 900, 'c99')")
-
-        sql(
-          "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
-        sql(
-          "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
+    withTable("source", "target") {
+      sql("CREATE TABLE source (a INT, b INT, c STRING)")
+      sql(
+        "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500, 
'c55'), (7, 700, 'c77'), (9, 900, 'c99')")
 
-        sql(s"""
-               |MERGE INTO target
-               |USING source
-               |ON target.a = source.a
-               |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + 
target.b
-               |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, 
c = source.c
-               |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, 
b * 1.1, c)
-               |WHEN NOT MATCHED THEN INSERT *
-               |""".stripMargin)
-        checkAnswer(
-          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"),
-          Seq(
-            Row(1, 10, "c1", 0, 2),
-            Row(2, 20, "c2", 1, 2),
-            Row(3, 300, "c33", 2, 2),
-            Row(4, 40, "c4", 3, 2),
-            Row(5, 550, "c5", 4, 2),
-            Row(7, 700, "c77", 5, 2),
-            Row(9, 990, "c99", 6, 2))
-        )
-      }
+      sql(
+        "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+      sql(
+        "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
+
+      sql(s"""
+             |MERGE INTO target
+             |USING source
+             |ON target.a = source.a
+             |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b + 
target.b
+             |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c 
= source.c
+             |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b 
* 1.1, c)
+             |WHEN NOT MATCHED THEN INSERT *
+             |""".stripMargin)
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"),
+        Seq(
+          Row(1, 10, "c1", 0, 2),
+          Row(2, 20, "c2", 1, 2),
+          Row(3, 300, "c33", 2, 2),
+          Row(4, 40, "c4", 3, 2),
+          Row(5, 550, "c5", 4, 2),
+          Row(7, 700, "c77", 5, 2),
+          Row(9, 990, "c99", 6, 2))
+      )
     }
   }
 
   test("Data Evolution: update table throws exception") {
-    if (gteqSpark3_5) {
-      withTable("t") {
-        sql(
-          "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
-        sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
-        assert(
-          intercept[RuntimeException] {
-            sql("UPDATE t SET b = 22")
-          }.getMessage
-            .contains("Update operation is not supported when data evolution 
is enabled yet."))
-      }
+    withTable("t") {
+      sql(
+        "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
+      assert(
+        intercept[RuntimeException] {
+          sql("UPDATE t SET b = 22")
+        }.getMessage
+          .contains("Update operation is not supported when data evolution is 
enabled yet."))
     }
   }
 
   test("Data Evolution: delete table throws exception") {
-    if (gteqSpark3_5) {
-      withTable("t") {
-        sql(
-          "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
-        sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
-        assert(
-          intercept[RuntimeException] {
-            sql("DELETE FROM t WHERE id = 2")
-          }.getMessage
-            .contains("Delete operation is not supported when data evolution 
is enabled yet."))
-      }
+    withTable("t") {
+      sql(
+        "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
+      assert(
+        intercept[RuntimeException] {
+          sql("DELETE FROM t WHERE id = 2")
+        }.getMessage
+          .contains("Delete operation is not supported when data evolution is 
enabled yet."))
     }
   }
 

Reply via email to