This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f32f02d3c8 [spark] Support MERGE for row lineage (#6045)
f32f02d3c8 is described below

commit f32f02d3c8c418b4a3e912446a22bd3381c41f47
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Aug 10 21:37:43 2025 +0800

    [spark] Support MERGE for row lineage (#6045)
---
 docs/content/append-table/row-tracking.md          |  27 +++++-
 .../spark/commands/MergeIntoPaimonTable.scala      | 106 +++++++++++++--------
 paimon-spark/paimon-spark-ut/pom.xml               |  17 +++-
 .../paimon/spark/sql/RowLineageTestBase.scala      |  47 +++++++++
 4 files changed, 151 insertions(+), 46 deletions(-)

diff --git a/docs/content/append-table/row-tracking.md 
b/docs/content/append-table/row-tracking.md
index 3c9fe75cf5..16c9d76bcb 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -46,7 +46,7 @@ WITH ('row-tracking.enabled' = 'true');
 ```
 Notice that:
 - Row tracking is only supported for unaware append tables, not for primary 
key tables. Which means you can't define `bucket` and `bucket-key` for the 
table.
-- Only spark support update and merge into (not yet) operations on 
row-tracking tables, Flink SQL does not support these operations yet.
+- Only spark support update and merge into operations on row-tracking tables, 
Flink SQL does not support these operations yet.
 - This function is experimental, this line will be removed after being stable.
 
 ## How to use row tracking
@@ -62,7 +62,7 @@ You can select the row lineage meta column with the following 
sql in spark:
 SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
 ```
 You will get the following result:
-```sql
+```text
 +---+----+-------+----------------+
 | id|data|_ROW_ID|_SEQUENCE_NUMBER|
 +---+----+-------+----------------+
@@ -78,13 +78,32 @@ SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
 ```
 
 You will get:
-```sql
+```text
 +---+---------------+-------+----------------+
 | id|           data|_ROW_ID|_SEQUENCE_NUMBER|
 +---+---------------+-------+----------------+
 | 22|              b|      1|               1|
 | 11|new-data-update|      0|               2|
-                 +---+---------------+-------+----------------+
++---+---------------+-------+----------------+
+```
+
+You can also merge into the table, suppose you have a source table `s` that 
contains (22, 'new-data-merge') and (33, 'c'):
+```sql
+MERGE INTO t USING s
+ON t.id = s.id
+WHEN MATCHED THEN UPDATE SET t.data = s.data
+WHEN NOT MATCHED THEN INSERT *
+```
+
+You will get:
+```text
++---+---------------+-------+----------------+
+| id|           data|_ROW_ID|_SEQUENCE_NUMBER|
++---+---------------+-------+----------------+
+| 11|new-data-update|      0|               2|
+| 22| new-data-merge|      1|               3|
+| 33|              c|      2|               3|
++---+---------------+-------+----------------+
 ```
 
 ## Spec
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 73a1e681ca..d54639310b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -24,7 +24,7 @@ import 
org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.{PaimonMetadataColumn, 
SparkSystemColumns}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH, 
FILE_PATH_COLUMN, ROW_INDEX, ROW_INDEX_COLUMN}
 import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.{FileStoreTable, SpecialFields}
 import org.apache.paimon.table.sink.CommitMessage
 import org.apache.paimon.types.RowKind
 
@@ -114,14 +114,10 @@ case class MergeIntoPaimonTable(
         sparkSession,
         filteredRelation,
         remainDeletedRow = true,
-        metadataCols = metadataCols)
+        extraMetadataCols = metadataCols)
 
       ds.cache()
       try {
-        val rowKindAttribute = ds.queryExecution.analyzed.output
-          .find(attr => sparkSession.sessionState.conf.resolver(attr.name, 
ROW_KIND_COL))
-          .getOrElse(throw new RuntimeException("Can not find _row_kind_ 
column."))
-
         // Step3: filter rows that should be marked as DELETED in Deletion 
Vector mode.
         val dvDS = ds.where(
           s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL = 
${RowKind.UPDATE_AFTER.toByteValue}")
@@ -141,8 +137,10 @@ case class MergeIntoPaimonTable(
         ds.unpersist()
       }
     } else {
-      val touchedFilePathsSet = mutable.Set.empty[String]
-      val intersectionFilePaths = mutable.Set.empty[String]
+      // Files need to be rewritten
+      val filePathsToRewritten = mutable.Set.empty[String]
+      // Files need to be read, but not rewritten
+      val filePathsToRead = mutable.Set.empty[String]
 
       def hasUpdate(actions: Seq[MergeAction]): Boolean = {
         actions.exists {
@@ -159,39 +157,44 @@ case class MergeIntoPaimonTable(
       }
 
       if (hasUpdate(matchedActions)) {
-        touchedFilePathsSet ++= findTouchedFiles0("inner")
+        filePathsToRewritten ++= findTouchedFiles0("inner")
       } else if (notMatchedActions.nonEmpty) {
-        intersectionFilePaths ++= findTouchedFiles0("inner")
+        filePathsToRead ++= findTouchedFiles0("inner")
       }
 
       if (hasUpdate(notMatchedBySourceActions)) {
-        touchedFilePathsSet ++= findTouchedFiles0("left_anti")
-      }
-
-      val touchedFilePaths: Array[String] = touchedFilePathsSet.toArray
-      val unTouchedFilePaths = if (notMatchedActions.nonEmpty) {
-        intersectionFilePaths.diff(touchedFilePathsSet).toArray
-      } else {
-        Array[String]()
+        val noMatchedBySourceFilePaths = findTouchedFiles0("left_anti")
+        filePathsToRewritten ++= noMatchedBySourceFilePaths
+        filePathsToRead --= noMatchedBySourceFilePaths
       }
 
-      val (touchedFiles, touchedFileRelation) =
-        createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
+      val (filesToRewritten, touchedFileRelation) =
+        createNewRelation(filePathsToRewritten.toArray, dataFilePathToMeta, 
relation)
       val (_, unTouchedFileRelation) =
-        createNewRelation(unTouchedFilePaths, dataFilePathToMeta, relation)
+        createNewRelation(filePathsToRead.toArray, dataFilePathToMeta, 
relation)
 
       // Add FILE_TOUCHED_COL to mark the row as coming from the touched file, 
if the row has not been
       // modified and was from touched file, it should be kept too.
-      val touchedDsWithFileTouchedCol = createDataset(sparkSession, 
touchedFileRelation)
+      val targetDSWithFileTouchedCol = createDataset(sparkSession, 
touchedFileRelation)
         .withColumn(FILE_TOUCHED_COL, lit(true))
-      val targetDSWithFileTouchedCol = touchedDsWithFileTouchedCol.union(
-        createDataset(sparkSession, unTouchedFileRelation)
+        .union(createDataset(sparkSession, unTouchedFileRelation)
           .withColumn(FILE_TOUCHED_COL, lit(false)))
 
-      val toWriteDS =
-        constructChangedRows(sparkSession, 
targetDSWithFileTouchedCol).drop(ROW_KIND_COL)
-      val addCommitMessage = dvSafeWriter.write(toWriteDS)
-      val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+      // If no files need to be rewritten, no need to write row lineage
+      val writeRowLineage = coreOptions.rowTrackingEnabled() && 
filesToRewritten.nonEmpty
+
+      val toWriteDS = constructChangedRows(
+        sparkSession,
+        targetDSWithFileTouchedCol,
+        writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
+
+      val writer = if (writeRowLineage) {
+        dvSafeWriter.withRowLineage()
+      } else {
+        dvSafeWriter
+      }
+      val addCommitMessage = writer.write(toWriteDS)
+      val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
 
       addCommitMessage ++ deletedCommitMessage
     }
@@ -203,7 +206,8 @@ case class MergeIntoPaimonTable(
       targetDataset: Dataset[Row],
       remainDeletedRow: Boolean = false,
       deletionVectorEnabled: Boolean = false,
-      metadataCols: Seq[PaimonMetadataColumn] = Seq.empty): Dataset[Row] = {
+      extraMetadataCols: Seq[PaimonMetadataColumn] = Seq.empty,
+      writeRowLineage: Boolean = false): Dataset[Row] = {
     val targetDS = targetDataset
       .withColumn(TARGET_ROW_COL, lit(true))
 
@@ -217,7 +221,6 @@ case class MergeIntoPaimonTable(
       resolveExpressions(sparkSession)(exprs, joinedPlan)
     }
 
-    val targetOutput = filteredTargetPlan.output
     val targetRowNotMatched = resolveOnJoinedPlan(
       Seq(toExpression(sparkSession, col(SOURCE_ROW_COL).isNull))).head
     val sourceRowNotMatched = resolveOnJoinedPlan(
@@ -225,17 +228,35 @@ case class MergeIntoPaimonTable(
     val matchedExprs = matchedActions.map(_.condition.getOrElse(TrueLiteral))
     val notMatchedExprs = 
notMatchedActions.map(_.condition.getOrElse(TrueLiteral))
     val notMatchedBySourceExprs = 
notMatchedBySourceActions.map(_.condition.getOrElse(TrueLiteral))
-    val noopOutput = targetOutput :+ Alias(Literal(NOOP_ROW_KIND_VALUE), 
ROW_KIND_COL)()
-    val keepOutput = targetOutput :+ 
Alias(Literal(RowKind.INSERT.toByteValue), ROW_KIND_COL)()
 
     val resolver = sparkSession.sessionState.conf.resolver
-    val metadataAttributes = metadataCols.flatMap {
-      metadataCol => joinedPlan.output.find(attr => resolver(metadataCol.name, 
attr.name))
+    def attribute(name: String) = joinedPlan.output.find(attr => 
resolver(name, attr.name))
+    val extraMetadataAttributes =
+      extraMetadataCols.flatMap(metadataCol => attribute(metadataCol.name))
+    val (rowIdAttr, sequenceNumberAttr) = if (writeRowLineage) {
+      (
+        attribute(SpecialFields.ROW_ID.name()).get,
+        attribute(SpecialFields.SEQUENCE_NUMBER.name()).get)
+    } else {
+      (null, null)
+    }
+
+    val targetOutput = if (writeRowLineage) {
+      filteredTargetPlan.output ++ Seq(rowIdAttr, sequenceNumberAttr)
+    } else {
+      filteredTargetPlan.output
     }
+    val noopOutput = targetOutput :+ Alias(Literal(NOOP_ROW_KIND_VALUE), 
ROW_KIND_COL)()
+    val keepOutput = targetOutput :+ 
Alias(Literal(RowKind.INSERT.toByteValue), ROW_KIND_COL)()
+
     def processMergeActions(actions: Seq[MergeAction]): Seq[Seq[Expression]] = 
{
       val columnExprs = actions.map {
         case UpdateAction(_, assignments) =>
-          assignments.map(_.value) :+ Literal(RowKind.UPDATE_AFTER.toByteValue)
+          var exprs = assignments.map(_.value)
+          if (writeRowLineage) {
+            exprs ++= Seq(rowIdAttr, Literal(null))
+          }
+          exprs :+ Literal(RowKind.UPDATE_AFTER.toByteValue)
         case DeleteAction(_) =>
           if (remainDeletedRow || deletionVectorEnabled) {
             targetOutput :+ Literal(RowKind.DELETE.toByteValue)
@@ -245,17 +266,26 @@ case class MergeIntoPaimonTable(
             noopOutput
           }
         case InsertAction(_, assignments) =>
-          assignments.map(_.value) :+ Literal(RowKind.INSERT.toByteValue)
+          var exprs = assignments.map(_.value)
+          if (writeRowLineage) {
+            exprs ++= Seq(rowIdAttr, sequenceNumberAttr)
+          }
+          exprs :+ Literal(RowKind.INSERT.toByteValue)
       }
-      columnExprs.map(exprs => exprs ++ metadataAttributes)
+
+      columnExprs.map(exprs => exprs ++ extraMetadataAttributes)
     }
 
     val matchedOutputs = processMergeActions(matchedActions)
     val notMatchedBySourceOutputs = 
processMergeActions(notMatchedBySourceActions)
     val notMatchedOutputs = processMergeActions(notMatchedActions)
     val outputFields = mutable.ArrayBuffer(tableSchema.fields: _*)
+    if (writeRowLineage) {
+      outputFields += PaimonMetadataColumn.ROW_ID.toStructField
+      outputFields += PaimonMetadataColumn.SEQUENCE_NUMBER.toStructField
+    }
     outputFields += StructField(ROW_KIND_COL, ByteType)
-    outputFields ++= metadataCols.map(_.toStructField)
+    outputFields ++= extraMetadataCols.map(_.toStructField)
     val outputSchema = StructType(outputFields.toSeq)
 
     val joinedRowEncoder = EncoderUtils.encode(joinedPlan.schema)
diff --git a/paimon-spark/paimon-spark-ut/pom.xml 
b/paimon-spark/paimon-spark-ut/pom.xml
index 3fb7070a81..d1ceafae23 100644
--- a/paimon-spark/paimon-spark-ut/pom.xml
+++ b/paimon-spark/paimon-spark-ut/pom.xml
@@ -40,17 +40,19 @@ under the License.
     <dependencies>
 
         <dependency>
-            <groupId>org.codehaus.janino</groupId>
-            <artifactId>janino</artifactId>
-            <version>${janino.test.version}</version>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
-            <artifactId>paimon-common</artifactId>
+            <artifactId>paimon-core</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>${paimon-sparkx-common}</artifactId>
@@ -218,6 +220,13 @@ under the License.
             </exclusions>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+            <version>${janino.test.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index 6f76a95fdb..113f6e3ad3 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -81,4 +81,51 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
       )
     }
   }
+
+  test("Row Lineage: merge into table") {
+    withTable("s", "t") {
+      sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+      sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM 
range(2, 4)")
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(2, 2, 0, 1), Row(3, 3, 1, 1))
+      )
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN MATCHED THEN UPDATE SET t.b = s.b
+            |WHEN NOT MATCHED THEN INSERT *
+            |""".stripMargin)
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 11, 2, 2), Row(2, 22, 0, 2), Row(3, 3, 1, 1))
+      )
+    }
+  }
+
+  test("Row Lineage: merge into table with only insert") {
+    withTable("s", "t") {
+      sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+      sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM 
range(2, 4)")
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN NOT MATCHED THEN INSERT *
+            |""".stripMargin)
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 11, 2, 2), Row(2, 2, 0, 1), Row(3, 3, 1, 1))
+      )
+    }
+  }
 }

Reply via email to