This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8703b95a33 [spark] Support MergeInto on Paimon append-only table in
spark V2 write (#7200)
8703b95a33 is described below
commit 8703b95a33e5d597dcb20a0e8e551152f05d2e66
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Feb 5 15:41:01 2026 +0800
[spark] Support MergeInto on Paimon append-only table in spark V2 write
(#7200)
---
.../spark/catalyst/analysis/PaimonMergeInto.scala | 23 ++++--
.../spark/catalyst/analysis/PaimonMergeInto.scala | 24 ++++--
.../paimon/spark/sql/MergeIntoTableTest.scala | 40 ++++++++++
.../analysis/AssignmentAlignmentHelper.scala | 15 ----
.../spark/catalyst/analysis/PaimonMergeInto.scala | 25 ++++--
.../catalyst/analysis/PaimonMergeIntoBase.scala | 88 ++++++++--------------
.../spark/catalyst/analysis/RowLevelHelper.scala | 11 ++-
.../spark/rowops/PaimonCopyOnWriteScan.scala | 38 +++++++---
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 8 +-
9 files changed, 162 insertions(+), 110 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
index b1af5a2d88..95ae7471da 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
@@ -18,20 +18,27 @@
package org.apache.paimon.spark.catalyst.analysis
-import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.commands.MergeIntoPaimonTable
-
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction,
MergeIntoTable}
/** A post-hoc resolution rule for MergeInto. */
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
- override def resolveNotMatchedBySourceActions(
- merge: MergeIntoTable,
- targetOutput: Seq[AttributeReference],
- dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
+ /**
+ * Align all MergeActions in a MergeIntoTable based on the target table's
output attributes.
+ * Returns a new MergeIntoTable with aligned matchedActions and
notMatchedActions.
+ */
+ override def alignMergeIntoTable(
+ m: MergeIntoTable,
+ targetOutput: Seq[Attribute]): MergeIntoTable = {
+ m.copy(
+ matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
+ notMatchedActions = m.notMatchedActions.map(alignMergeAction(_,
targetOutput))
+ )
+ }
+
+ override def resolveNotMatchedBySourceActions(merge: MergeIntoTable):
Seq[MergeAction] = {
Seq.empty
}
}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
index b1af5a2d88..a92d13cc0b 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
@@ -18,20 +18,28 @@
package org.apache.paimon.spark.catalyst.analysis
-import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.commands.MergeIntoPaimonTable
-
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction,
MergeIntoTable}
/** A post-hoc resolution rule for MergeInto. */
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
- override def resolveNotMatchedBySourceActions(
- merge: MergeIntoTable,
- targetOutput: Seq[AttributeReference],
- dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
+ /**
+ * Align all MergeActions in a MergeIntoTable based on the target table's
output attributes.
+ * Returns a new MergeIntoTable with aligned matchedActions and
notMatchedActions.
+ */
+ override def alignMergeIntoTable(
+ m: MergeIntoTable,
+ targetOutput: Seq[Attribute]): MergeIntoTable = {
+ m.copy(
+ matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
+ notMatchedActions = m.notMatchedActions.map(alignMergeAction(_,
targetOutput))
+ )
+ }
+
+ override def resolveNotMatchedBySourceActions(merge: MergeIntoTable):
Seq[MergeAction] = {
Seq.empty
}
+
}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index c83ee54938..c8ae09a26b 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -61,3 +61,43 @@ class MergeIntoAppendNonBucketedTableTest
super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
}
}
+
+class V2MergeIntoPrimaryKeyBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoPrimaryKeyTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonPrimaryKeyBucketedTableTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
+
+class V2MergeIntoPrimaryKeyNonBucketTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoPrimaryKeyTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonPrimaryKeyNonBucketTableTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
+
+class V2MergeIntoAppendBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonAppendBucketedTableTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
+
+class V2MergeIntoAppendNonBucketedTableTest
+ extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
+ with MergeIntoNotMatchedBySourceTest
+ with PaimonAppendNonBucketTableTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
index 13ce64b86b..2404f1f49f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
@@ -89,21 +89,6 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with
ExpressionHelper {
}
}
- /**
- * Align all MergeActions in a MergeIntoTable based on the target table's
output attributes.
- * Returns a new MergeIntoTable with aligned matchedActions,
notMatchedActions, and
- * notMatchedBySourceActions.
- */
- protected def alignMergeIntoTable(
- m: MergeIntoTable,
- targetOutput: Seq[Attribute]): MergeIntoTable = {
- m.copy(
- matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
- notMatchedActions = m.notMatchedActions.map(alignMergeAction(_,
targetOutput)),
- notMatchedBySourceActions =
m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
- )
- }
-
private def recursiveAlignUpdates(
targetAttrs: Seq[NamedExpression],
updates: Seq[AttrUpdate],
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
index d6023c2f69..45916be761 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala
@@ -19,17 +19,28 @@
package org.apache.paimon.spark.catalyst.analysis
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction,
MergeIntoTable}
/** A post-hoc resolution rule for MergeInto. */
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
- override def resolveNotMatchedBySourceActions(
- merge: MergeIntoTable,
- targetOutput: Seq[AttributeReference],
- dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
- merge.notMatchedBySourceActions.map(
- checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled))
+ /**
+ * Align all MergeActions in a MergeIntoTable based on the target table's
output attributes.
+ * Returns a new MergeIntoTable with aligned matchedActions,
notMatchedActions, and
+ * notMatchedBySourceActions.
+ */
+ override def alignMergeIntoTable(
+ m: MergeIntoTable,
+ targetOutput: Seq[Attribute]): MergeIntoTable = {
+ m.copy(
+ matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
+ notMatchedActions = m.notMatchedActions.map(alignMergeAction(_,
targetOutput)),
+ notMatchedBySourceActions =
m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
+ )
+ }
+
+ override def resolveNotMatchedBySourceActions(merge: MergeIntoTable):
Seq[MergeAction] = {
+ merge.notMatchedBySourceActions
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index f9a186e70b..84560543a2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -23,7 +23,7 @@ import
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable,
MergeIntoPaimonTable}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
@@ -62,64 +62,34 @@ trait PaimonMergeIntoBase
updateActions,
primaryKeys)
}
- val alignedMatchedActions =
- merge.matchedActions.map(
- checkAndAlignActionAssignment(_, targetOutput,
dataEvolutionEnabled))
- val alignedNotMatchedActions =
- merge.notMatchedActions.map(
- checkAndAlignActionAssignment(_, targetOutput,
dataEvolutionEnabled))
- val alignedNotMatchedBySourceActions =
- resolveNotMatchedBySourceActions(merge, targetOutput,
dataEvolutionEnabled)
-
- if (dataEvolutionEnabled) {
- MergeIntoPaimonDataEvolutionTable(
- v2Table,
- merge.targetTable,
- merge.sourceTable,
- merge.mergeCondition,
- alignedMatchedActions,
- alignedNotMatchedActions,
- alignedNotMatchedBySourceActions
- )
- } else {
- MergeIntoPaimonTable(
- v2Table,
- merge.targetTable,
- merge.sourceTable,
- merge.mergeCondition,
- alignedMatchedActions,
- alignedNotMatchedActions,
- alignedNotMatchedBySourceActions
- )
- }
- }
- }
-
- def resolveNotMatchedBySourceActions(
- merge: MergeIntoTable,
- targetOutput: Seq[AttributeReference],
- dataEvolutionEnabled: Boolean): Seq[MergeAction]
- protected def checkAndAlignActionAssignment(
- action: MergeAction,
- targetOutput: Seq[AttributeReference],
- dataEvolutionEnabled: Boolean): MergeAction = {
- action match {
- case d @ DeleteAction(_) => d
- case u @ UpdateAction(_, assignments) =>
- u.copy(assignments = alignAssignments(targetOutput, assignments))
+ val alignedMergeIntoTable = alignMergeIntoTable(merge, targetOutput)
- case i @ InsertAction(_, assignments) =>
- i.copy(assignments = alignAssignments(targetOutput, assignments,
isInsert = true))
-
- case _: UpdateStarAction =>
- throw new RuntimeException(s"UpdateStarAction should not be here.")
-
- case _: InsertStarAction =>
- throw new RuntimeException(s"InsertStarAction should not be here.")
-
- case _ =>
- throw new RuntimeException(s"Can't recognize this action: $action")
+ if (!shouldFallbackToV1MergeInto(alignedMergeIntoTable)) {
+ alignedMergeIntoTable
+ } else {
+ if (dataEvolutionEnabled) {
+ MergeIntoPaimonDataEvolutionTable(
+ v2Table,
+ merge.targetTable,
+ merge.sourceTable,
+ merge.mergeCondition,
+ alignedMergeIntoTable.matchedActions,
+ alignedMergeIntoTable.notMatchedActions,
+ resolveNotMatchedBySourceActions(alignedMergeIntoTable)
+ )
+ } else {
+ MergeIntoPaimonTable(
+ v2Table,
+ merge.targetTable,
+ merge.sourceTable,
+ merge.mergeCondition,
+ alignedMergeIntoTable.matchedActions,
+ alignedMergeIntoTable.notMatchedActions,
+ resolveNotMatchedBySourceActions(alignedMergeIntoTable)
+ )
+ }
+ }
}
}
@@ -156,4 +126,8 @@ trait PaimonMergeIntoBase
throw new RuntimeException("Can't update the primary key column in
update clause.")
}
}
+
+ def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction]
+
+ def alignMergeIntoTable(m: MergeIntoTable, targetOutput: Seq[Attribute]):
MergeIntoTable
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index 8ec139b607..b3b0929227 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.table.{FileStoreTable, Table}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment,
MergeIntoTable, UpdateTable}
trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {
@@ -102,4 +102,13 @@ trait RowLevelHelper extends SQLConfHelper with
AssignmentAlignmentHelper {
!updateTable.rewritable ||
!updateTable.aligned
}
+
+ /** Determines if DataSourceV2 merge into is not supported for the given
table. */
+ protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = {
+ val relation = PaimonRelation.getPaimonRelation(m.targetTable)
+ val table = relation.table.asInstanceOf[SparkTable]
+ shouldFallbackToV1(table) ||
+ !m.rewritable ||
+ !m.aligned
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
index 11bd72e360..9b44094c23 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
@@ -42,6 +42,9 @@ import scala.collection.mutable
/**
* Note: The [[pushedPartitionFilters]] and [[pushedDataFilters]] are
intentionally set to empty
* because file-level filtering is handled through Spark's runtime V2
filtering mechanism.
+ *
+ * When Spark's runtime filter is not applied (e.g., when NOT MATCHED BY
SOURCE is present in
+ * MergeInto), this scan will read all data from the table.
*/
case class PaimonCopyOnWriteScan(
table: FileStoreTable,
@@ -51,10 +54,29 @@ case class PaimonCopyOnWriteScan(
extends BaseScan
with SupportsRuntimeV2Filtering {
- override def inputSplits: Array[Split] =
dataSplits.asInstanceOf[Array[Split]]
+ // Track whether filter() has been called
+ @volatile private var filterApplied: Boolean = false
+
+ private val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
+
+ override def inputSplits: Array[Split] = {
+ loadSplits()
+ dataSplits.asInstanceOf[Array[Split]]
+ }
var dataSplits: Array[DataSplit] = Array()
+ private def loadSplits(): Unit = {
+ val snapshotReader = table.newSnapshotReader()
+ if (table.coreOptions().manifestDeleteFileDropStats()) {
+ snapshotReader.dropStats()
+ }
+ if (filterApplied) {
+ snapshotReader.withDataFileNameFilter(fileName =>
filteredFileNames.contains(fileName))
+ }
+ dataSplits = snapshotReader.read().splits().asScala.collect { case s:
DataSplit => s }.toArray
+ }
+
def scannedFiles: Seq[SparkDataFileMeta] = {
dataSplits
.flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit,
dataSplit.totalBuckets()))
@@ -66,9 +88,9 @@ case class PaimonCopyOnWriteScan(
}
override def filter(predicates: Array[SparkPredicate]): Unit = {
- val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
- val runtimefilters: Array[Filter] =
predicates.flatMap(PaimonUtils.filterV2ToV1)
- for (filter <- runtimefilters) {
+ filterApplied = true
+ val runtimeFilters: Array[Filter] =
predicates.flatMap(PaimonUtils.filterV2ToV1)
+ for (filter <- runtimeFilters) {
filter match {
case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) =>
for (value <- in.values) {
@@ -78,14 +100,6 @@ case class PaimonCopyOnWriteScan(
case _ => logWarning("Unsupported runtime filter")
}
}
-
- val snapshotReader = table.newSnapshotReader()
- if (table.coreOptions().manifestDeleteFileDropStats()) {
- snapshotReader.dropStats()
- }
-
- snapshotReader.withDataFileNameFilter(fileName =>
filteredFileNames.contains(fileName))
- dataSplits = snapshotReader.read().splits().asScala.collect { case s:
DataSplit => s }.toArray
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index ee4799f56c..28e1b63fd2 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -528,7 +528,7 @@ abstract class MergeIntoTableTestBase extends
PaimonSparkTestBase with PaimonTab
createTable("target", "a INT, b INT, c STRING", Seq("a"))
spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")
- val error = intercept[RuntimeException] {
+ val error = intercept[Exception] {
spark.sql(s"""
|MERGE INTO target
|USING source
@@ -539,7 +539,11 @@ abstract class MergeIntoTableTestBase extends
PaimonSparkTestBase with PaimonTab
|THEN INSERT (a, b, c) values (a, b, c)
|""".stripMargin)
}.getMessage
- assert(error.contains("match more than one source rows"))
+ // V1 path: "match more than one source rows"
+ // V2 path: "MERGE_CARDINALITY_VIOLATION"
+ assert(
+ error.contains("match more than one source rows") ||
+ error.contains("MERGE_CARDINALITY_VIOLATION"))
}
}