This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 83410b8504 [spark] Eliminate the union stage when merging into without 
notMatchedBySource (#5137)
83410b8504 is described below

commit 83410b8504b6b31511aac69c1dad606fb784ac8f
Author: WenjunMin <[email protected]>
AuthorDate: Sun Mar 2 19:31:32 2025 +0800

    [spark] Eliminate the union stage when merging into without 
notMatchedBySource (#5137)
---
 .../apache/paimon/spark/commands/MergeIntoPaimonTable.scala | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 3df95917ab..8a70ca384d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -150,7 +150,7 @@ case class MergeIntoPaimonTable(
           case _ => false
         }
       }
-      if (hasUpdate(matchedActions)) {
+      if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) {
         touchedFilePathsSet ++= findTouchedFiles(
           targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), 
"inner"),
           sparkSession,
@@ -175,10 +175,15 @@ case class MergeIntoPaimonTable(
 
       // Add FILE_TOUCHED_COL to mark the row as coming from the touched file, 
if the row has not been
       // modified and was from touched file, it should be kept too.
-      val targetDSWithFileTouchedCol = createDataset(sparkSession, 
touchedFileRelation)
+      val touchedDsWithFileTouchedCol = createDataset(sparkSession, 
touchedFileRelation)
         .withColumn(FILE_TOUCHED_COL, lit(true))
-        .union(createDataset(sparkSession, unTouchedFileRelation)
-          .withColumn(FILE_TOUCHED_COL, lit(false)))
+      val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty) 
{
+        touchedDsWithFileTouchedCol.union(
+          createDataset(sparkSession, unTouchedFileRelation)
+            .withColumn(FILE_TOUCHED_COL, lit(false)))
+      } else {
+        touchedDsWithFileTouchedCol
+      }
 
       val toWriteDS =
         constructChangedRows(sparkSession, 
targetDSWithFileTouchedCol).drop(ROW_KIND_COL)

Reply via email to