This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9e6c5d2779 [spark] Data evolution support low version spark (#6135)
9e6c5d2779 is described below
commit 9e6c5d27793147ab760426659a72e1e12f9068c5
Author: YeJunHao <[email protected]>
AuthorDate: Mon Aug 25 11:38:04 2025 +0800
[spark] Data evolution support low version spark (#6135)
---
.../catalyst/analysis/PaimonDeleteTable.scala | 4 +
.../spark/execution/OldCompatibleStrategy.scala | 57 +++++
.../sql/catalyst/plans/logical/MergeRows.scala | 119 +++++++++++
.../execution/datasources/v2/MergeRowsExec.scala | 237 +++++++++++++++++++++
.../spark/execution/OldCompatibleStrategy.scala | 57 +++++
.../sql/catalyst/plans/logical/MergeRows.scala | 119 +++++++++++
.../execution/datasources/v2/MergeRowsExec.scala | 237 +++++++++++++++++++++
.../spark/execution/OldCompatibleStrategy.scala | 57 +++++
.../sql/catalyst/plans/logical/MergeRows.scala | 119 +++++++++++
.../execution/datasources/v2/MergeRowsExec.scala | 237 +++++++++++++++++++++
.../spark/execution/OldCompatibleStrategy.scala | 37 ++++
.../extensions/PaimonSparkSessionExtensions.scala | 4 +-
.../paimon/spark/sql/RowLineageTestBase.scala | 174 +++++++--------
13 files changed, 1365 insertions(+), 93 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 1d6aa1a8aa..329c4c5886 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -39,6 +39,10 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with
RowLevelHelper {
table.getTable match {
case paimonTable: FileStoreTable =>
val relation = PaimonRelation.getPaimonRelation(d.table)
+ if (paimonTable.coreOptions().dataEvolutionEnabled()) {
+ throw new RuntimeException(
+ "Delete operation is not supported when data evolution is
enabled yet.")
+ }
DeleteFromPaimonTableCommand(relation, paimonTable,
condition.getOrElse(TrueLiteral))
case _ =>
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
new file mode 100644
index 0000000000..111ceba23e
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
+
+case class OldCompatibleStrategy(spark: SparkSession)
+ extends SparkStrategy
+ with PredicateHelper
+ with PaimonLookupCatalog {
+
+ protected lazy val catalogManager = spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case MergeRows(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedInstructions,
+ notMatchedInstructions,
+ notMatchedBySourceInstructions,
+ checkCardinality,
+ output,
+ child) =>
+ MergeRowsExec(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedInstructions,
+ notMatchedInstructions,
+ notMatchedBySourceInstructions,
+ checkCardinality,
+ output,
+ planLater(child)
+ ) :: Nil
+ case _ => Nil
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
new file mode 100644
index 0000000000..a6f8b8f134
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction,
ROW_ID}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.types.{DataType, NullType}
+
+case class MergeRows(
+ isSourceRowPresent: Expression,
+ isTargetRowPresent: Expression,
+ matchedInstructions: Seq[Instruction],
+ notMatchedInstructions: Seq[Instruction],
+ notMatchedBySourceInstructions: Seq[Instruction],
+ checkCardinality: Boolean,
+ output: Seq[Attribute],
+ child: LogicalPlan)
+ extends UnaryNode {
+
+ override lazy val producedAttributes: AttributeSet = {
+ AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+ }
+
+ @transient
+ override lazy val references: AttributeSet = {
+ val usedExprs = if (checkCardinality) {
+ val rowIdAttr = child.output.find(attr => conf.resolver(attr.name,
ROW_ID))
+ assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+ rowIdAttr.get +: expressions
+ } else {
+ expressions
+ }
+ AttributeSet.fromAttributeSets(usedExprs.map(_.references)) --
producedAttributes
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}"
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan = {
+ copy(child = newChild)
+ }
+}
+
+object MergeRows {
+ final val ROW_ID = "__row_id"
+
+ /**
+ * When a MERGE operation is rewritten, the target table is joined with the
source and each
+ * MATCHED/NOT MATCHED/NOT MATCHED BY SOURCE clause is converted into a
corresponding instruction
+ * on top of the joined plan. The purpose of an instruction is to derive an
output row based on a
+ * joined row.
+ *
+ * Instructions are valid expressions so that they will be properly
transformed by the analyzer
+ * and optimizer.
+ */
+ sealed trait Instruction extends Expression with Unevaluable {
+ def condition: Expression
+ def outputs: Seq[Seq[Expression]]
+ override def nullable: Boolean = false
+ // We return NullType here as only the `MergeRows` operator can contain
`Instruction`
+ // expressions and it doesn't care about the data type. Some external
optimizer rules may
+ // assume optimized plan is always resolved and Expression#dataType is
always available, so
+ // we can't just fail here.
+ override def dataType: DataType = NullType
+ }
+
+ case class Keep(condition: Expression, output: Seq[Expression]) extends
Instruction {
+ def children: Seq[Expression] = condition +: output
+ override def outputs: Seq[Seq[Expression]] = Seq(output)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = {
+ copy(condition = newChildren.head, output = newChildren.tail)
+ }
+ }
+
+ case class Discard(condition: Expression) extends Instruction with
UnaryLike[Expression] {
+ override def outputs: Seq[Seq[Expression]] = Seq.empty
+ override def child: Expression = condition
+
+ override protected def withNewChildInternal(newChild: Expression):
Expression = {
+ copy(condition = newChild)
+ }
+ }
+
+ case class Split(condition: Expression, output: Seq[Expression],
otherOutput: Seq[Expression])
+ extends Instruction {
+
+ def children: Seq[Expression] = Seq(condition) ++ output ++ otherOutput
+ override def outputs: Seq[Seq[Expression]] = Seq(output, otherOutput)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = {
+ val newCondition = newChildren.head
+ val newOutput = newChildren.slice(from = 1, until = output.size + 1)
+ val newOtherOutput = newChildren.takeRight(otherOutput.size)
+ copy(condition = newCondition, output = newOutput, otherOutput =
newOtherOutput)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
new file mode 100644
index 0000000000..b830b6b6fe
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
BasePredicate, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows._
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.roaringbitmap.longlong.Roaring64Bitmap
+
+case class MergeRowsExec(
+ isSourceRowPresent: Expression,
+ isTargetRowPresent: Expression,
+ matchedInstructions: Seq[Instruction],
+ notMatchedInstructions: Seq[Instruction],
+ notMatchedBySourceInstructions: Seq[Instruction],
+ checkCardinality: Boolean,
+ output: Seq[Attribute],
+ child: SparkPlan)
+ extends UnaryExecNode {
+
+ @transient override lazy val producedAttributes: AttributeSet = {
+ AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+ }
+
+ @transient
+ override lazy val references: AttributeSet = {
+ val usedExprs = if (checkCardinality) {
+ val rowIdAttr = child.output.find(attr => conf.resolver(attr.name,
ROW_ID))
+ assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+ rowIdAttr.get +: expressions
+ } else {
+ expressions
+ }
+ AttributeSet.fromAttributeSets(usedExprs.map(_.references)) --
producedAttributes
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan
= {
+ copy(child = newChild)
+ }
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ child.execute().mapPartitions(processPartition)
+ }
+
+ private def processPartition(rowIterator: Iterator[InternalRow]):
Iterator[InternalRow] = {
+ val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+ val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
+
+ val matchedInstructionExecs = planInstructions(matchedInstructions)
+ val notMatchedInstructionExecs = planInstructions(notMatchedInstructions)
+ val notMatchedBySourceInstructionExecs =
planInstructions(notMatchedBySourceInstructions)
+
+ val cardinalityValidator = if (checkCardinality) {
+ val rowIdOrdinal = child.output.indexWhere(attr =>
conf.resolver(attr.name, ROW_ID))
+ assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+ BitmapCardinalityValidator(rowIdOrdinal)
+ } else {
+ NoopCardinalityValidator
+ }
+
+ val mergeIterator = new MergeRowIterator(
+ rowIterator,
+ cardinalityValidator,
+ isTargetRowPresentPred,
+ isSourceRowPresentPred,
+ matchedInstructionExecs,
+ notMatchedInstructionExecs,
+ notMatchedBySourceInstructionExecs)
+
+ // null indicates a record must be discarded
+ mergeIterator.filter(_ != null)
+ }
+
+ private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+ UnsafeProjection.create(exprs, child.output)
+ }
+
+ private def createPredicate(expr: Expression): BasePredicate = {
+ GeneratePredicate.generate(expr, child.output)
+ }
+
+ private def planInstructions(instructions: Seq[Instruction]):
Seq[InstructionExec] = {
+ instructions.map {
+ case Keep(cond, output) =>
+ KeepExec(createPredicate(cond), createProjection(output))
+
+ case Discard(cond) =>
+ DiscardExec(createPredicate(cond))
+
+ case Split(cond, output, otherOutput) =>
+ SplitExec(createPredicate(cond), createProjection(output),
createProjection(otherOutput))
+
+ case other =>
+ throw new RuntimeException("Unsupported instruction type: " +
other.getClass.getSimpleName)
+ }
+ }
+
+ sealed trait InstructionExec {
+ def condition: BasePredicate
+ }
+
+ case class KeepExec(condition: BasePredicate, projection: Projection)
extends InstructionExec {
+ def apply(row: InternalRow): InternalRow = projection.apply(row)
+ }
+
+ case class DiscardExec(condition: BasePredicate) extends InstructionExec
+
+ case class SplitExec(
+ condition: BasePredicate,
+ projection: Projection,
+ otherProjection: Projection)
+ extends InstructionExec {
+ def projectRow(row: InternalRow): InternalRow = projection.apply(row)
+
+ def projectExtraRow(row: InternalRow): InternalRow =
otherProjection.apply(row)
+ }
+
+ sealed trait CardinalityValidator {
+ def validate(row: InternalRow): Unit
+ }
+
+ object NoopCardinalityValidator extends CardinalityValidator {
+ def validate(row: InternalRow): Unit = {}
+ }
+
+ /**
+ * A simple cardinality validator that keeps track of seen row IDs in a
roaring bitmap. This
+ * validator assumes the target table is never broadcasted or replicated,
which guarantees matches
+ * for one target row are always co-located in the same partition.
+ *
+ * IDs are generated by
[[org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID]].
+ */
+ case class BitmapCardinalityValidator(rowIdOrdinal: Int) extends
CardinalityValidator {
+ // use Roaring64Bitmap as row IDs generated by MonotonicallyIncreasingID
are 64-bit integers
+ private val matchedRowIds = new Roaring64Bitmap()
+
+ override def validate(row: InternalRow): Unit = {
+ val currentRowId = row.getLong(rowIdOrdinal)
+ if (matchedRowIds.contains(currentRowId)) {
+ throw new RuntimeException("Should not happens")
+ }
+ matchedRowIds.add(currentRowId)
+ }
+ }
+
+ /**
+ * An iterator that acts on joined target and source rows and computes
deletes, updates and
+ * inserts according to provided MERGE instructions.
+ *
+ * If a particular joined row should be discarded, this iterator returns
null.
+ */
+ class MergeRowIterator(
+ private val rowIterator: Iterator[InternalRow],
+ private val cardinalityValidator: CardinalityValidator,
+ private val isTargetRowPresentPred: BasePredicate,
+ private val isSourceRowPresentPred: BasePredicate,
+ private val matchedInstructions: Seq[InstructionExec],
+ private val notMatchedInstructions: Seq[InstructionExec],
+ private val notMatchedBySourceInstructions: Seq[InstructionExec])
+ extends Iterator[InternalRow] {
+
+ var cachedExtraRow: InternalRow = _
+
+ override def hasNext: Boolean = cachedExtraRow != null ||
rowIterator.hasNext
+
+ override def next(): InternalRow = {
+ if (cachedExtraRow != null) {
+ val extraRow = cachedExtraRow
+ cachedExtraRow = null
+ return extraRow
+ }
+
+ val row = rowIterator.next()
+
+ val isSourceRowPresent = isSourceRowPresentPred.eval(row)
+ val isTargetRowPresent = isTargetRowPresentPred.eval(row)
+
+ if (isTargetRowPresent && isSourceRowPresent) {
+ cardinalityValidator.validate(row)
+ applyInstructions(row, matchedInstructions)
+ } else if (isSourceRowPresent) {
+ applyInstructions(row, notMatchedInstructions)
+ } else if (isTargetRowPresent) {
+ applyInstructions(row, notMatchedBySourceInstructions)
+ } else {
+ null
+ }
+ }
+
+ private def applyInstructions(
+ row: InternalRow,
+ instructions: Seq[InstructionExec]): InternalRow = {
+
+ for (instruction <- instructions) {
+ if (instruction.condition.eval(row)) {
+ instruction match {
+ case keep: KeepExec =>
+ return keep.apply(row)
+
+ case _: DiscardExec =>
+ return null
+
+ case split: SplitExec =>
+ cachedExtraRow = split.projectExtraRow(row)
+ return split.projectRow(row)
+ }
+ }
+ }
+
+ null
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
new file mode 100644
index 0000000000..111ceba23e
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
+
+case class OldCompatibleStrategy(spark: SparkSession)
+ extends SparkStrategy
+ with PredicateHelper
+ with PaimonLookupCatalog {
+
+ protected lazy val catalogManager = spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case MergeRows(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedInstructions,
+ notMatchedInstructions,
+ notMatchedBySourceInstructions,
+ checkCardinality,
+ output,
+ child) =>
+ MergeRowsExec(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedInstructions,
+ notMatchedInstructions,
+ notMatchedBySourceInstructions,
+ checkCardinality,
+ output,
+ planLater(child)
+ ) :: Nil
+ case _ => Nil
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
new file mode 100644
index 0000000000..a6f8b8f134
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction,
ROW_ID}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.types.{DataType, NullType}
+
+case class MergeRows(
+ isSourceRowPresent: Expression,
+ isTargetRowPresent: Expression,
+ matchedInstructions: Seq[Instruction],
+ notMatchedInstructions: Seq[Instruction],
+ notMatchedBySourceInstructions: Seq[Instruction],
+ checkCardinality: Boolean,
+ output: Seq[Attribute],
+ child: LogicalPlan)
+ extends UnaryNode {
+
+ override lazy val producedAttributes: AttributeSet = {
+ AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+ }
+
+ @transient
+ override lazy val references: AttributeSet = {
+ val usedExprs = if (checkCardinality) {
+ val rowIdAttr = child.output.find(attr => conf.resolver(attr.name,
ROW_ID))
+ assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+ rowIdAttr.get +: expressions
+ } else {
+ expressions
+ }
+ AttributeSet.fromAttributeSets(usedExprs.map(_.references)) --
producedAttributes
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}"
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan = {
+ copy(child = newChild)
+ }
+}
+
+object MergeRows {
+ final val ROW_ID = "__row_id"
+
+ /**
+ * When a MERGE operation is rewritten, the target table is joined with the
source and each
+ * MATCHED/NOT MATCHED/NOT MATCHED BY SOURCE clause is converted into a
corresponding instruction
+ * on top of the joined plan. The purpose of an instruction is to derive an
output row based on a
+ * joined row.
+ *
+ * Instructions are valid expressions so that they will be properly
transformed by the analyzer
+ * and optimizer.
+ */
+ sealed trait Instruction extends Expression with Unevaluable {
+ def condition: Expression
+ def outputs: Seq[Seq[Expression]]
+ override def nullable: Boolean = false
+ // We return NullType here as only the `MergeRows` operator can contain
`Instruction`
+ // expressions and it doesn't care about the data type. Some external
optimizer rules may
+ // assume optimized plan is always resolved and Expression#dataType is
always available, so
+ // we can't just fail here.
+ override def dataType: DataType = NullType
+ }
+
+ case class Keep(condition: Expression, output: Seq[Expression]) extends
Instruction {
+ def children: Seq[Expression] = condition +: output
+ override def outputs: Seq[Seq[Expression]] = Seq(output)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = {
+ copy(condition = newChildren.head, output = newChildren.tail)
+ }
+ }
+
+ case class Discard(condition: Expression) extends Instruction with
UnaryLike[Expression] {
+ override def outputs: Seq[Seq[Expression]] = Seq.empty
+ override def child: Expression = condition
+
+ override protected def withNewChildInternal(newChild: Expression):
Expression = {
+ copy(condition = newChild)
+ }
+ }
+
+ case class Split(condition: Expression, output: Seq[Expression],
otherOutput: Seq[Expression])
+ extends Instruction {
+
+ def children: Seq[Expression] = Seq(condition) ++ output ++ otherOutput
+ override def outputs: Seq[Seq[Expression]] = Seq(output, otherOutput)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = {
+ val newCondition = newChildren.head
+ val newOutput = newChildren.slice(from = 1, until = output.size + 1)
+ val newOtherOutput = newChildren.takeRight(otherOutput.size)
+ copy(condition = newCondition, output = newOutput, otherOutput =
newOtherOutput)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
new file mode 100644
index 0000000000..b830b6b6fe
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
BasePredicate, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows._
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.roaringbitmap.longlong.Roaring64Bitmap
+
+case class MergeRowsExec(
+ isSourceRowPresent: Expression,
+ isTargetRowPresent: Expression,
+ matchedInstructions: Seq[Instruction],
+ notMatchedInstructions: Seq[Instruction],
+ notMatchedBySourceInstructions: Seq[Instruction],
+ checkCardinality: Boolean,
+ output: Seq[Attribute],
+ child: SparkPlan)
+ extends UnaryExecNode {
+
+ @transient override lazy val producedAttributes: AttributeSet = {
+ AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+ }
+
+ @transient
+ override lazy val references: AttributeSet = {
+ val usedExprs = if (checkCardinality) {
+ val rowIdAttr = child.output.find(attr => conf.resolver(attr.name,
ROW_ID))
+ assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+ rowIdAttr.get +: expressions
+ } else {
+ expressions
+ }
+ AttributeSet.fromAttributeSets(usedExprs.map(_.references)) --
producedAttributes
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan
= {
+ copy(child = newChild)
+ }
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ child.execute().mapPartitions(processPartition)
+ }
+
+ private def processPartition(rowIterator: Iterator[InternalRow]):
Iterator[InternalRow] = {
+ val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+ val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
+
+ val matchedInstructionExecs = planInstructions(matchedInstructions)
+ val notMatchedInstructionExecs = planInstructions(notMatchedInstructions)
+ val notMatchedBySourceInstructionExecs =
planInstructions(notMatchedBySourceInstructions)
+
+ val cardinalityValidator = if (checkCardinality) {
+ val rowIdOrdinal = child.output.indexWhere(attr =>
conf.resolver(attr.name, ROW_ID))
+ assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+ BitmapCardinalityValidator(rowIdOrdinal)
+ } else {
+ NoopCardinalityValidator
+ }
+
+ val mergeIterator = new MergeRowIterator(
+ rowIterator,
+ cardinalityValidator,
+ isTargetRowPresentPred,
+ isSourceRowPresentPred,
+ matchedInstructionExecs,
+ notMatchedInstructionExecs,
+ notMatchedBySourceInstructionExecs)
+
+ // null indicates a record must be discarded
+ mergeIterator.filter(_ != null)
+ }
+
+ private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+ UnsafeProjection.create(exprs, child.output)
+ }
+
+ private def createPredicate(expr: Expression): BasePredicate = {
+ GeneratePredicate.generate(expr, child.output)
+ }
+
+ private def planInstructions(instructions: Seq[Instruction]):
Seq[InstructionExec] = {
+ instructions.map {
+ case Keep(cond, output) =>
+ KeepExec(createPredicate(cond), createProjection(output))
+
+ case Discard(cond) =>
+ DiscardExec(createPredicate(cond))
+
+ case Split(cond, output, otherOutput) =>
+ SplitExec(createPredicate(cond), createProjection(output),
createProjection(otherOutput))
+
+ case other =>
+ throw new RuntimeException("Unsupported instruction type: " +
other.getClass.getSimpleName)
+ }
+ }
+
+ sealed trait InstructionExec {
+ def condition: BasePredicate
+ }
+
+ case class KeepExec(condition: BasePredicate, projection: Projection)
extends InstructionExec {
+ def apply(row: InternalRow): InternalRow = projection.apply(row)
+ }
+
+ case class DiscardExec(condition: BasePredicate) extends InstructionExec
+
+ case class SplitExec(
+ condition: BasePredicate,
+ projection: Projection,
+ otherProjection: Projection)
+ extends InstructionExec {
+ def projectRow(row: InternalRow): InternalRow = projection.apply(row)
+
+ def projectExtraRow(row: InternalRow): InternalRow =
otherProjection.apply(row)
+ }
+
+ sealed trait CardinalityValidator {
+ def validate(row: InternalRow): Unit
+ }
+
+ object NoopCardinalityValidator extends CardinalityValidator {
+ def validate(row: InternalRow): Unit = {}
+ }
+
+ /**
+ * A simple cardinality validator that keeps track of seen row IDs in a
roaring bitmap. This
+ * validator assumes the target table is never broadcasted or replicated,
which guarantees matches
+ * for one target row are always co-located in the same partition.
+ *
+ * IDs are generated by
[[org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID]].
+ */
+ case class BitmapCardinalityValidator(rowIdOrdinal: Int) extends
CardinalityValidator {
+ // use Roaring64Bitmap as row IDs generated by MonotonicallyIncreasingID
are 64-bit integers
+ private val matchedRowIds = new Roaring64Bitmap()
+
+ override def validate(row: InternalRow): Unit = {
+ val currentRowId = row.getLong(rowIdOrdinal)
+ if (matchedRowIds.contains(currentRowId)) {
+ throw new RuntimeException("Should not happens")
+ }
+ matchedRowIds.add(currentRowId)
+ }
+ }
+
+ /**
+ * An iterator that acts on joined target and source rows and computes
deletes, updates and
+ * inserts according to provided MERGE instructions.
+ *
+ * If a particular joined row should be discarded, this iterator returns
null.
+ */
+ class MergeRowIterator(
+ private val rowIterator: Iterator[InternalRow],
+ private val cardinalityValidator: CardinalityValidator,
+ private val isTargetRowPresentPred: BasePredicate,
+ private val isSourceRowPresentPred: BasePredicate,
+ private val matchedInstructions: Seq[InstructionExec],
+ private val notMatchedInstructions: Seq[InstructionExec],
+ private val notMatchedBySourceInstructions: Seq[InstructionExec])
+ extends Iterator[InternalRow] {
+
+ var cachedExtraRow: InternalRow = _
+
+ override def hasNext: Boolean = cachedExtraRow != null ||
rowIterator.hasNext
+
+ override def next(): InternalRow = {
+ if (cachedExtraRow != null) {
+ val extraRow = cachedExtraRow
+ cachedExtraRow = null
+ return extraRow
+ }
+
+ val row = rowIterator.next()
+
+ val isSourceRowPresent = isSourceRowPresentPred.eval(row)
+ val isTargetRowPresent = isTargetRowPresentPred.eval(row)
+
+ if (isTargetRowPresent && isSourceRowPresent) {
+ cardinalityValidator.validate(row)
+ applyInstructions(row, matchedInstructions)
+ } else if (isSourceRowPresent) {
+ applyInstructions(row, notMatchedInstructions)
+ } else if (isTargetRowPresent) {
+ applyInstructions(row, notMatchedBySourceInstructions)
+ } else {
+ null
+ }
+ }
+
+ private def applyInstructions(
+ row: InternalRow,
+ instructions: Seq[InstructionExec]): InternalRow = {
+
+ for (instruction <- instructions) {
+ if (instruction.condition.eval(row)) {
+ instruction match {
+ case keep: KeepExec =>
+ return keep.apply(row)
+
+ case _: DiscardExec =>
+ return null
+
+ case split: SplitExec =>
+ cachedExtraRow = split.projectExtraRow(row)
+ return split.projectRow(row)
+ }
+ }
+ }
+
+ null
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
new file mode 100644
index 0000000000..111ceba23e
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
+
+case class OldCompatibleStrategy(spark: SparkSession)
+ extends SparkStrategy
+ with PredicateHelper
+ with PaimonLookupCatalog {
+
+ protected lazy val catalogManager = spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case MergeRows(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedInstructions,
+ notMatchedInstructions,
+ notMatchedBySourceInstructions,
+ checkCardinality,
+ output,
+ child) =>
+ MergeRowsExec(
+ isSourceRowPresent,
+ isTargetRowPresent,
+ matchedInstructions,
+ notMatchedInstructions,
+ notMatchedBySourceInstructions,
+ checkCardinality,
+ output,
+ planLater(child)
+ ) :: Nil
+ case _ => Nil
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
new file mode 100644
index 0000000000..a6f8b8f134
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction,
ROW_ID}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.types.{DataType, NullType}
+
+case class MergeRows(
+ isSourceRowPresent: Expression,
+ isTargetRowPresent: Expression,
+ matchedInstructions: Seq[Instruction],
+ notMatchedInstructions: Seq[Instruction],
+ notMatchedBySourceInstructions: Seq[Instruction],
+ checkCardinality: Boolean,
+ output: Seq[Attribute],
+ child: LogicalPlan)
+ extends UnaryNode {
+
+ override lazy val producedAttributes: AttributeSet = {
+ AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+ }
+
+ @transient
+ override lazy val references: AttributeSet = {
+ val usedExprs = if (checkCardinality) {
+ val rowIdAttr = child.output.find(attr => conf.resolver(attr.name,
ROW_ID))
+ assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+ rowIdAttr.get +: expressions
+ } else {
+ expressions
+ }
+ AttributeSet.fromAttributeSets(usedExprs.map(_.references)) --
producedAttributes
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}"
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan = {
+ copy(child = newChild)
+ }
+}
+
+object MergeRows {
+ final val ROW_ID = "__row_id"
+
+ /**
+ * When a MERGE operation is rewritten, the target table is joined with the
source and each
+ * MATCHED/NOT MATCHED/NOT MATCHED BY SOURCE clause is converted into a
corresponding instruction
+ * on top of the joined plan. The purpose of an instruction is to derive an
output row based on a
+ * joined row.
+ *
+ * Instructions are valid expressions so that they will be properly
transformed by the analyzer
+ * and optimizer.
+ */
+ sealed trait Instruction extends Expression with Unevaluable {
+ def condition: Expression
+ def outputs: Seq[Seq[Expression]]
+ override def nullable: Boolean = false
+ // We return NullType here as only the `MergeRows` operator can contain
`Instruction`
+ // expressions and it doesn't care about the data type. Some external
optimizer rules may
+ // assume optimized plan is always resolved and Expression#dataType is
always available, so
+ // we can't just fail here.
+ override def dataType: DataType = NullType
+ }
+
+ case class Keep(condition: Expression, output: Seq[Expression]) extends
Instruction {
+ def children: Seq[Expression] = condition +: output
+ override def outputs: Seq[Seq[Expression]] = Seq(output)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = {
+ copy(condition = newChildren.head, output = newChildren.tail)
+ }
+ }
+
+ case class Discard(condition: Expression) extends Instruction with
UnaryLike[Expression] {
+ override def outputs: Seq[Seq[Expression]] = Seq.empty
+ override def child: Expression = condition
+
+ override protected def withNewChildInternal(newChild: Expression):
Expression = {
+ copy(condition = newChild)
+ }
+ }
+
+ case class Split(condition: Expression, output: Seq[Expression],
otherOutput: Seq[Expression])
+ extends Instruction {
+
+ def children: Seq[Expression] = Seq(condition) ++ output ++ otherOutput
+ override def outputs: Seq[Seq[Expression]] = Seq(output, otherOutput)
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): Expression = {
+ val newCondition = newChildren.head
+ val newOutput = newChildren.slice(from = 1, until = output.size + 1)
+ val newOtherOutput = newChildren.takeRight(otherOutput.size)
+ copy(condition = newCondition, output = newOutput, otherOutput =
newOtherOutput)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
new file mode 100644
index 0000000000..b830b6b6fe
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
BasePredicate, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows._
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.roaringbitmap.longlong.Roaring64Bitmap
+
+case class MergeRowsExec(
+ isSourceRowPresent: Expression,
+ isTargetRowPresent: Expression,
+ matchedInstructions: Seq[Instruction],
+ notMatchedInstructions: Seq[Instruction],
+ notMatchedBySourceInstructions: Seq[Instruction],
+ checkCardinality: Boolean,
+ output: Seq[Attribute],
+ child: SparkPlan)
+ extends UnaryExecNode {
+
+ @transient override lazy val producedAttributes: AttributeSet = {
+ AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+ }
+
+ @transient
+ override lazy val references: AttributeSet = {
+ val usedExprs = if (checkCardinality) {
+ val rowIdAttr = child.output.find(attr => conf.resolver(attr.name,
ROW_ID))
+ assert(rowIdAttr.isDefined, "Cannot find row ID attr")
+ rowIdAttr.get +: expressions
+ } else {
+ expressions
+ }
+ AttributeSet.fromAttributeSets(usedExprs.map(_.references)) --
producedAttributes
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan
= {
+ copy(child = newChild)
+ }
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ child.execute().mapPartitions(processPartition)
+ }
+
+ private def processPartition(rowIterator: Iterator[InternalRow]):
Iterator[InternalRow] = {
+ val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+ val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
+
+ val matchedInstructionExecs = planInstructions(matchedInstructions)
+ val notMatchedInstructionExecs = planInstructions(notMatchedInstructions)
+ val notMatchedBySourceInstructionExecs =
planInstructions(notMatchedBySourceInstructions)
+
+ val cardinalityValidator = if (checkCardinality) {
+ val rowIdOrdinal = child.output.indexWhere(attr =>
conf.resolver(attr.name, ROW_ID))
+ assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+ BitmapCardinalityValidator(rowIdOrdinal)
+ } else {
+ NoopCardinalityValidator
+ }
+
+ val mergeIterator = new MergeRowIterator(
+ rowIterator,
+ cardinalityValidator,
+ isTargetRowPresentPred,
+ isSourceRowPresentPred,
+ matchedInstructionExecs,
+ notMatchedInstructionExecs,
+ notMatchedBySourceInstructionExecs)
+
+ // null indicates a record must be discarded
+ mergeIterator.filter(_ != null)
+ }
+
+ private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+ UnsafeProjection.create(exprs, child.output)
+ }
+
+ private def createPredicate(expr: Expression): BasePredicate = {
+ GeneratePredicate.generate(expr, child.output)
+ }
+
+ private def planInstructions(instructions: Seq[Instruction]):
Seq[InstructionExec] = {
+ instructions.map {
+ case Keep(cond, output) =>
+ KeepExec(createPredicate(cond), createProjection(output))
+
+ case Discard(cond) =>
+ DiscardExec(createPredicate(cond))
+
+ case Split(cond, output, otherOutput) =>
+ SplitExec(createPredicate(cond), createProjection(output),
createProjection(otherOutput))
+
+ case other =>
+ throw new RuntimeException("Unsupported instruction type: " +
other.getClass.getSimpleName)
+ }
+ }
+
+ sealed trait InstructionExec {
+ def condition: BasePredicate
+ }
+
+ case class KeepExec(condition: BasePredicate, projection: Projection)
extends InstructionExec {
+ def apply(row: InternalRow): InternalRow = projection.apply(row)
+ }
+
+ case class DiscardExec(condition: BasePredicate) extends InstructionExec
+
+ case class SplitExec(
+ condition: BasePredicate,
+ projection: Projection,
+ otherProjection: Projection)
+ extends InstructionExec {
+ def projectRow(row: InternalRow): InternalRow = projection.apply(row)
+
+ def projectExtraRow(row: InternalRow): InternalRow =
otherProjection.apply(row)
+ }
+
+ sealed trait CardinalityValidator {
+ def validate(row: InternalRow): Unit
+ }
+
+ object NoopCardinalityValidator extends CardinalityValidator {
+ def validate(row: InternalRow): Unit = {}
+ }
+
+ /**
+ * A simple cardinality validator that keeps track of seen row IDs in a
roaring bitmap. This
+ * validator assumes the target table is never broadcasted or replicated,
which guarantees matches
+ * for one target row are always co-located in the same partition.
+ *
+ * IDs are generated by
[[org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID]].
+ */
+ case class BitmapCardinalityValidator(rowIdOrdinal: Int) extends
CardinalityValidator {
+ // use Roaring64Bitmap as row IDs generated by MonotonicallyIncreasingID
are 64-bit integers
+ private val matchedRowIds = new Roaring64Bitmap()
+
+ override def validate(row: InternalRow): Unit = {
+ val currentRowId = row.getLong(rowIdOrdinal)
+ if (matchedRowIds.contains(currentRowId)) {
+ throw new RuntimeException("Should not happens")
+ }
+ matchedRowIds.add(currentRowId)
+ }
+ }
+
+ /**
+ * An iterator that acts on joined target and source rows and computes
deletes, updates and
+ * inserts according to provided MERGE instructions.
+ *
+ * If a particular joined row should be discarded, this iterator returns
null.
+ */
+ class MergeRowIterator(
+ private val rowIterator: Iterator[InternalRow],
+ private val cardinalityValidator: CardinalityValidator,
+ private val isTargetRowPresentPred: BasePredicate,
+ private val isSourceRowPresentPred: BasePredicate,
+ private val matchedInstructions: Seq[InstructionExec],
+ private val notMatchedInstructions: Seq[InstructionExec],
+ private val notMatchedBySourceInstructions: Seq[InstructionExec])
+ extends Iterator[InternalRow] {
+
+ var cachedExtraRow: InternalRow = _
+
+ override def hasNext: Boolean = cachedExtraRow != null ||
rowIterator.hasNext
+
+ override def next(): InternalRow = {
+ if (cachedExtraRow != null) {
+ val extraRow = cachedExtraRow
+ cachedExtraRow = null
+ return extraRow
+ }
+
+ val row = rowIterator.next()
+
+ val isSourceRowPresent = isSourceRowPresentPred.eval(row)
+ val isTargetRowPresent = isTargetRowPresentPred.eval(row)
+
+ if (isTargetRowPresent && isSourceRowPresent) {
+ cardinalityValidator.validate(row)
+ applyInstructions(row, matchedInstructions)
+ } else if (isSourceRowPresent) {
+ applyInstructions(row, notMatchedInstructions)
+ } else if (isTargetRowPresent) {
+ applyInstructions(row, notMatchedBySourceInstructions)
+ } else {
+ null
+ }
+ }
+
+ private def applyInstructions(
+ row: InternalRow,
+ instructions: Seq[InstructionExec]): InternalRow = {
+
+ for (instruction <- instructions) {
+ if (instruction.condition.eval(row)) {
+ instruction match {
+ case keep: KeepExec =>
+ return keep.apply(row)
+
+ case _: DiscardExec =>
+ return null
+
+ case split: SplitExec =>
+ cachedExtraRow = split.projectExtraRow(row)
+ return split.projectRow(row)
+ }
+ }
+ }
+
+ null
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
new file mode 100644
index 0000000000..7f6fcc6c63
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/OldCompatibleStrategy.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+
+case class OldCompatibleStrategy(spark: SparkSession)
+ extends SparkStrategy
+ with PredicateHelper
+ with PaimonLookupCatalog {
+
+ protected lazy val catalogManager = spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case _ => Nil
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 8e363c5026..b6e29b8a77 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -22,7 +22,7 @@ import
org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTa
import
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable,
MergePaimonScalarSubqueries}
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.commands.BucketExpression
-import org.apache.paimon.spark.execution.PaimonStrategy
+import org.apache.paimon.spark.execution.{OldCompatibleStrategy,
PaimonStrategy}
import
org.apache.paimon.spark.execution.adaptive.DisableUnnecessaryPaimonBucketedScan
import org.apache.spark.sql.SparkSessionExtensions
@@ -70,6 +70,8 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
// planner extensions
extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
+ // old compatible
+ extensions.injectPlannerStrategy(spark => OldCompatibleStrategy(spark))
// query stage preparation
extensions.injectQueryStagePrepRule(_ =>
DisableUnnecessaryPaimonBucketedScan)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index 043adef310..da43a2e91b 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -199,119 +199,109 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
}
test("Data Evolution: insert into table with data-evolution") {
- if (gteqSpark3_5) {
- withTable("s", "t") {
- sql("CREATE TABLE s (id INT, b INT)")
- sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT)")
+ sql("INSERT INTO s VALUES (1, 11), (2, 22)")
- sql(
- "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
- sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
+ sql(
+ "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
- sql("""
- |MERGE INTO t
- |USING s
- |ON t.id = s.id
- |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
- |""".stripMargin)
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
+ |""".stripMargin)
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(3, 3, 3, 1, 1))
- )
- }
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 11, 11, 2, 2), Row(2, 2, 2, 0, 1), Row(3, 3, 3, 1, 1))
+ )
}
}
test("Data Evolution: merge into table with data-evolution") {
- if (gteqSpark3_5) {
- withTable("s", "t") {
- sql("CREATE TABLE s (id INT, b INT)")
- sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT)")
+ sql("INSERT INTO s VALUES (1, 11), (2, 22)")
- sql(
- "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
- sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
-
- sql("""
- |MERGE INTO t
- |USING s
- |ON t.id = s.id
- |WHEN MATCHED THEN UPDATE SET t.b = s.b
- |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
- |""".stripMargin)
- checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3)))
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
- )
- }
+ sql(
+ "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET t.b = s.b
+ |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 11)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(3)))
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 11, 11, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
+ )
}
}
test("Data Evolution: merge into table with data-evolution complex") {
- if (gteqSpark3_5) {
- withTable("source", "target") {
- sql("CREATE TABLE source (a INT, b INT, c STRING)")
- sql(
- "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5,
500, 'c55'), (7, 700, 'c77'), (9, 900, 'c99')")
-
- sql(
- "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
- sql(
- "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30,
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
+ withTable("source", "target") {
+ sql("CREATE TABLE source (a INT, b INT, c STRING)")
+ sql(
+ "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500,
'c55'), (7, 700, 'c77'), (9, 900, 'c99')")
- sql(s"""
- |MERGE INTO target
- |USING source
- |ON target.a = source.a
- |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b +
target.b
- |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b,
c = source.c
- |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a,
b * 1.1, c)
- |WHEN NOT MATCHED THEN INSERT *
- |""".stripMargin)
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"),
- Seq(
- Row(1, 10, "c1", 0, 2),
- Row(2, 20, "c2", 1, 2),
- Row(3, 300, "c33", 2, 2),
- Row(4, 40, "c4", 3, 2),
- Row(5, 550, "c5", 4, 2),
- Row(7, 700, "c77", 5, 2),
- Row(9, 990, "c99", 6, 2))
- )
- }
+ sql(
+ "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql(
+ "INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30,
'c3'), (4, 40, 'c4'), (5, 50, 'c5')")
+
+ sql(s"""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED AND target.a = 5 THEN UPDATE SET b = source.b +
target.b
+ |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c
= source.c
+ |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b
* 1.1, c)
+ |WHEN NOT MATCHED THEN INSERT *
+ |""".stripMargin)
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"),
+ Seq(
+ Row(1, 10, "c1", 0, 2),
+ Row(2, 20, "c2", 1, 2),
+ Row(3, 300, "c33", 2, 2),
+ Row(4, 40, "c4", 3, 2),
+ Row(5, 550, "c5", 4, 2),
+ Row(7, 700, "c77", 5, 2),
+ Row(9, 990, "c99", 6, 2))
+ )
}
}
test("Data Evolution: update table throws exception") {
- if (gteqSpark3_5) {
- withTable("t") {
- sql(
- "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
- sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
- assert(
- intercept[RuntimeException] {
- sql("UPDATE t SET b = 22")
- }.getMessage
- .contains("Update operation is not supported when data evolution
is enabled yet."))
- }
+ withTable("t") {
+ sql(
+ "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
+ assert(
+ intercept[RuntimeException] {
+ sql("UPDATE t SET b = 22")
+ }.getMessage
+ .contains("Update operation is not supported when data evolution is
enabled yet."))
}
}
test("Data Evolution: delete table throws exception") {
- if (gteqSpark3_5) {
- withTable("t") {
- sql(
- "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
- sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
- assert(
- intercept[RuntimeException] {
- sql("DELETE FROM t WHERE id = 2")
- }.getMessage
- .contains("Delete operation is not supported when data evolution
is enabled yet."))
- }
+ withTable("t") {
+ sql(
+ "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c
FROM range(2, 4)")
+ assert(
+ intercept[RuntimeException] {
+ sql("DELETE FROM t WHERE id = 2")
+ }.getMessage
+ .contains("Delete operation is not supported when data evolution is
enabled yet."))
}
}