This is an automated email from the ASF dual-hosted git repository.

aitozi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8255e3376c [spark] Eliminate the union stage when merging into without 
notMatchedActions (#5195)
8255e3376c is described below

commit 8255e3376ccd4218084ce01c3b0c52bad8750e32
Author: WenjunMin <[email protected]>
AuthorDate: Mon Mar 10 22:51:25 2025 +0800

    [spark] Eliminate the union stage when merging into without 
notMatchedActions (#5195)
---
 .../spark/commands/MergeIntoPaimonTable.scala      | 41 ++++++++++++----------
 1 file changed, 23 insertions(+), 18 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 8a70ca384d..a1c2abcc52 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -144,29 +144,38 @@ case class MergeIntoPaimonTable(
       }
     } else {
       val touchedFilePathsSet = mutable.Set.empty[String]
+      val intersectionFilePaths = mutable.Set.empty[String]
+
       def hasUpdate(actions: Seq[MergeAction]): Boolean = {
         actions.exists {
           case _: UpdateAction | _: DeleteAction => true
           case _ => false
         }
       }
-      if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) {
-        touchedFilePathsSet ++= findTouchedFiles(
-          targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), 
"inner"),
+
+      def findTouchedFiles0(joinType: String): Array[String] = {
+        findTouchedFiles(
+          targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), 
joinType),
           sparkSession,
-          "_left." + FILE_PATH_COLUMN
-        )
+          "_left." + FILE_PATH_COLUMN)
       }
+
+      if (hasUpdate(matchedActions)) {
+        touchedFilePathsSet ++= findTouchedFiles0("inner")
+      } else if (notMatchedActions.nonEmpty) {
+        intersectionFilePaths ++= findTouchedFiles0("inner")
+      }
+
       if (hasUpdate(notMatchedBySourceActions)) {
-        touchedFilePathsSet ++= findTouchedFiles(
-          targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), 
"left_anti"),
-          sparkSession,
-          "_left." + FILE_PATH_COLUMN)
+        touchedFilePathsSet ++= findTouchedFiles0("left_anti")
       }
 
-      val targetFilePaths: Array[String] = findTouchedFiles(targetDS, 
sparkSession)
       val touchedFilePaths: Array[String] = touchedFilePathsSet.toArray
-      val unTouchedFilePaths = 
targetFilePaths.filterNot(touchedFilePaths.contains)
+      val unTouchedFilePaths = if (notMatchedActions.nonEmpty) {
+        intersectionFilePaths.diff(touchedFilePathsSet).toArray
+      } else {
+        Array[String]()
+      }
 
       val (touchedFiles, touchedFileRelation) =
         createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
@@ -177,13 +186,9 @@ case class MergeIntoPaimonTable(
       // modified and was from touched file, it should be kept too.
       val touchedDsWithFileTouchedCol = createDataset(sparkSession, 
touchedFileRelation)
         .withColumn(FILE_TOUCHED_COL, lit(true))
-      val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty) 
{
-        touchedDsWithFileTouchedCol.union(
-          createDataset(sparkSession, unTouchedFileRelation)
-            .withColumn(FILE_TOUCHED_COL, lit(false)))
-      } else {
-        touchedDsWithFileTouchedCol
-      }
+      val targetDSWithFileTouchedCol = touchedDsWithFileTouchedCol.union(
+        createDataset(sparkSession, unTouchedFileRelation)
+          .withColumn(FILE_TOUCHED_COL, lit(false)))
 
       val toWriteDS =
         constructChangedRows(sparkSession, 
targetDSWithFileTouchedCol).drop(ROW_KIND_COL)

Reply via email to