This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 83410b8504 [spark] Eliminate the union stage when merging into without
notMatchedBySource (#5137)
83410b8504 is described below
commit 83410b8504b6b31511aac69c1dad606fb784ac8f
Author: WenjunMin <[email protected]>
AuthorDate: Sun Mar 2 19:31:32 2025 +0800
[spark] Eliminate the union stage when merging into without
notMatchedBySource (#5137)
---
.../apache/paimon/spark/commands/MergeIntoPaimonTable.scala | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 3df95917ab..8a70ca384d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -150,7 +150,7 @@ case class MergeIntoPaimonTable(
case _ => false
}
}
- if (hasUpdate(matchedActions)) {
+ if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) {
touchedFilePathsSet ++= findTouchedFiles(
targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition),
"inner"),
sparkSession,
@@ -175,10 +175,15 @@ case class MergeIntoPaimonTable(
// Add FILE_TOUCHED_COL to mark the row as coming from the touched file,
if the row has not been
// modified and was from touched file, it should be kept too.
- val targetDSWithFileTouchedCol = createDataset(sparkSession,
touchedFileRelation)
+ val touchedDsWithFileTouchedCol = createDataset(sparkSession,
touchedFileRelation)
.withColumn(FILE_TOUCHED_COL, lit(true))
- .union(createDataset(sparkSession, unTouchedFileRelation)
- .withColumn(FILE_TOUCHED_COL, lit(false)))
+ val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty)
{
+ touchedDsWithFileTouchedCol.union(
+ createDataset(sparkSession, unTouchedFileRelation)
+ .withColumn(FILE_TOUCHED_COL, lit(false)))
+ } else {
+ touchedDsWithFileTouchedCol
+ }
val toWriteDS =
constructChangedRows(sparkSession,
targetDSWithFileTouchedCol).drop(ROW_KIND_COL)