This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c151e6c27c [spark] Support DELETE for row tracking table (#6086)
c151e6c27c is described below

commit c151e6c27c0d681108a671c12e17756e71c0fc6f
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Aug 18 17:35:13 2025 +0800

    [spark] Support DELETE for row tracking table (#6086)
---
 docs/content/append-table/row-tracking.md          | 22 +++++++++++++++++++---
 .../commands/DeleteFromPaimonTableCommand.scala    |  7 +++++--
 .../spark/commands/MergeIntoPaimonTable.scala      |  4 ++--
 .../apache/paimon/spark/util/ScanPlanHelper.scala  |  4 ++--
 .../paimon/spark/sql/RowLineageTestBase.scala      | 13 +++++++++++++
 .../paimon/spark/util/ScanPlanHelperTest.scala     |  6 +++---
 6 files changed, 44 insertions(+), 12 deletions(-)

diff --git a/docs/content/append-table/row-tracking.md 
b/docs/content/append-table/row-tracking.md
index fd8ffb9a72..5277653001 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -29,7 +29,7 @@ under the License.
 ## What is row tracking
 
 Row tracking allows Paimon to track row-level lineage in a Paimon append 
table. Once enabled on a Paimon table, two more hidden columns will be added to 
the table schema:
-- `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It 
is used to track the lineage of the row and can be used to identify the row in 
case of updates or merge into.
+- `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It 
is used to track the lineage of the row and can be used to identify the row in 
case of update, merge into or delete.
 - `_SEQUENCE_NUMBER`: BIGINT, this is field indicates which `version` of this 
record is. It actually is the snapshot-id of the snapshot that this row belongs 
to. It is used to track the lineage of the row version.
 
 ## Enable row tracking
@@ -46,7 +46,7 @@ WITH ('row-tracking.enabled' = 'true');
 ```
 Notice that:
 - Row tracking is only supported for unaware append tables, not for primary 
key tables. Which means you can't define `bucket` and `bucket-key` for the 
table.
-- Only spark support update and merge into operations on row-tracking tables, 
Flink SQL does not support these operations yet.
+- Only spark support update, merge into and delete operations on row-tracking 
tables, Flink SQL does not support these operations yet.
 - This function is experimental, this line will be removed after being stable.
 
 ## How to use row tracking
@@ -92,7 +92,7 @@ You can also merge into the table, suppose you have a source 
table `s` that cont
 MERGE INTO t USING s
 ON t.id = s.id
 WHEN MATCHED THEN UPDATE SET t.data = s.data
-WHEN NOT MATCHED THEN INSERT *
+WHEN NOT MATCHED THEN INSERT *;
 ```
 
 You will get:
@@ -106,6 +106,22 @@ You will get:
 +---+---------------+-------+----------------+
 ```
 
+You can also delete from the table:
+
+```sql
+DELETE FROM t WHERE id = 11;
+```
+
+You will get:
+```text
++---+---------------+-------+----------------+
+| id|           data|_ROW_ID|_SEQUENCE_NUMBER|
++---+---------------+-------+----------------+
+| 22| new-data-merge|      1|               3|
+| 33|              c|      2|               3|
++---+---------------+-------+----------------+
+```
+
 ## Spec
 
 `_ROW_ID` and `_SEQUENCE_NUMBER` fields follows the following rules:
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index b6045b5219..d6a30f9097 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -147,10 +147,13 @@ case class DeleteFromPaimonTableCommand(
 
       // Step4: build a dataframe that contains the unchanged data, and write 
out them.
       val toRewriteScanRelation = Filter(Not(condition), newRelation)
-      val data = createDataset(sparkSession, toRewriteScanRelation)
+      var data = createDataset(sparkSession, toRewriteScanRelation)
+      if (coreOptions.rowTrackingEnabled()) {
+        data = selectWithRowLineage(data)
+      }
 
       // only write new files, should have no compaction
-      val addCommitMessage = dvSafeWriter.writeOnly().write(data)
+      val addCommitMessage = 
dvSafeWriter.writeOnly().withRowLineage().write(data)
 
       // Step5: convert the deleted files that need to be written to commit 
message.
       val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 6b170a39b4..24d94bc56c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -107,7 +107,7 @@ case class MergeIntoPaimonTable(
         createNewScanPlan(candidateDataSplits, relation, targetOnlyCondition))
       val ds = constructChangedRows(
         sparkSession,
-        selectWithDvMetaCols(filteredDf),
+        selectWithDvMeta(filteredDf),
         remainDeletedRow = true,
         extraMetadataCols = dvMetaCols)
 
@@ -176,7 +176,7 @@ case class MergeIntoPaimonTable(
       var filesToRewrittenDS =
         createDataset(sparkSession, 
filesToRewrittenScan).withColumn(FILE_TOUCHED_COL, lit(true))
       if (writeRowLineage) {
-        filesToRewrittenDS = selectWithRowLineageMetaCols(filesToRewrittenDS)
+        filesToRewrittenDS = selectWithRowLineage(filesToRewrittenDS)
       }
 
       var filesToReadDS =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
index 437a563ca8..5f8e260254 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -51,11 +51,11 @@ trait ScanPlanHelper extends SQLConfHelper {
     }
   }
 
-  def selectWithDvMetaCols(data: DataFrame): DataFrame = {
+  def selectWithDvMeta(data: DataFrame): DataFrame = {
     selectWithAdditionalCols(data, DV_META_COLUMNS)
   }
 
-  def selectWithRowLineageMetaCols(data: DataFrame): DataFrame = {
+  def selectWithRowLineage(data: DataFrame): DataFrame = {
     selectWithAdditionalCols(data, ROW_LINEAGE_META_COLUMNS)
   }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index d82f343056..1753cfb654 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -64,6 +64,19 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Row Lineage: delete table") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+      sql("DELETE FROM t WHERE id = 2")
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1))
+      )
+    }
+  }
+
   test("Row Lineage: update table") {
     withTable("t") {
       sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
index 007b95e10b..dcbf12e273 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
@@ -39,16 +39,16 @@ class ScanPlanHelperTest extends PaimonSparkTestBase with 
ScanPlanHelper {
       checkAnswer(newDf, Seq(Row(11, "a"), Row(22, "b")))
 
       // select df with row lineage meta cols
-      checkAnswer(selectWithRowLineageMetaCols(newDf), Seq(Row(11, "a", 0, 1), 
Row(22, "b", 1, 1)))
+      checkAnswer(selectWithRowLineage(newDf), Seq(Row(11, "a", 0, 1), Row(22, 
"b", 1, 1)))
 
       // select with row lineage meta cols twice should not add new more meta 
cols
       checkAnswer(
-        selectWithRowLineageMetaCols(selectWithRowLineageMetaCols(newDf)),
+        selectWithRowLineage(selectWithRowLineage(newDf)),
         Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1)))
 
       // select df already contains meta cols with row lineage
       checkAnswer(
-        selectWithRowLineageMetaCols(newDf.select("_ROW_ID", "id")),
+        selectWithRowLineage(newDf.select("_ROW_ID", "id")),
         Seq(Row(0, 11, 1), Row(1, 22, 1)))
     }
   }

Reply via email to