This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c151e6c27c [spark] Support DELETE for row tracking table (#6086)
c151e6c27c is described below
commit c151e6c27c0d681108a671c12e17756e71c0fc6f
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Aug 18 17:35:13 2025 +0800
[spark] Support DELETE for row tracking table (#6086)
---
docs/content/append-table/row-tracking.md | 22 +++++++++++++++++++---
.../commands/DeleteFromPaimonTableCommand.scala | 7 +++++--
.../spark/commands/MergeIntoPaimonTable.scala | 4 ++--
.../apache/paimon/spark/util/ScanPlanHelper.scala | 4 ++--
.../paimon/spark/sql/RowLineageTestBase.scala | 13 +++++++++++++
.../paimon/spark/util/ScanPlanHelperTest.scala | 6 +++---
6 files changed, 44 insertions(+), 12 deletions(-)
diff --git a/docs/content/append-table/row-tracking.md
b/docs/content/append-table/row-tracking.md
index fd8ffb9a72..5277653001 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -29,7 +29,7 @@ under the License.
## What is row tracking
Row tracking allows Paimon to track row-level lineage in a Paimon append
table. Once enabled on a Paimon table, two more hidden columns will be added to
the table schema:
-- `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It
is used to track the lineage of the row and can be used to identify the row in
case of updates or merge into.
+- `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It
is used to track the lineage of the row and can be used to identify the row in
case of update, merge into or delete.
- `_SEQUENCE_NUMBER`: BIGINT, this is field indicates which `version` of this
record is. It actually is the snapshot-id of the snapshot that this row belongs
to. It is used to track the lineage of the row version.
## Enable row tracking
@@ -46,7 +46,7 @@ WITH ('row-tracking.enabled' = 'true');
```
Notice that:
- Row tracking is only supported for unaware append tables, not for primary
key tables. Which means you can't define `bucket` and `bucket-key` for the
table.
-- Only spark support update and merge into operations on row-tracking tables,
Flink SQL does not support these operations yet.
+- Only spark support update, merge into and delete operations on row-tracking
tables, Flink SQL does not support these operations yet.
- This function is experimental, this line will be removed after being stable.
## How to use row tracking
@@ -92,7 +92,7 @@ You can also merge into the table, suppose you have a source
table `s` that cont
MERGE INTO t USING s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.data = s.data
-WHEN NOT MATCHED THEN INSERT *
+WHEN NOT MATCHED THEN INSERT *;
```
You will get:
@@ -106,6 +106,22 @@ You will get:
+---+---------------+-------+----------------+
```
+You can also delete from the table:
+
+```sql
+DELETE FROM t WHERE id = 11;
+```
+
+You will get:
+```text
++---+---------------+-------+----------------+
+| id| data|_ROW_ID|_SEQUENCE_NUMBER|
++---+---------------+-------+----------------+
+| 22| new-data-merge| 1| 3|
+| 33| c| 2| 3|
++---+---------------+-------+----------------+
+```
+
## Spec
`_ROW_ID` and `_SEQUENCE_NUMBER` fields follows the following rules:
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index b6045b5219..d6a30f9097 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -147,10 +147,13 @@ case class DeleteFromPaimonTableCommand(
// Step4: build a dataframe that contains the unchanged data, and write
out them.
val toRewriteScanRelation = Filter(Not(condition), newRelation)
- val data = createDataset(sparkSession, toRewriteScanRelation)
+ var data = createDataset(sparkSession, toRewriteScanRelation)
+ if (coreOptions.rowTrackingEnabled()) {
+ data = selectWithRowLineage(data)
+ }
// only write new files, should have no compaction
- val addCommitMessage = dvSafeWriter.writeOnly().write(data)
+ val addCommitMessage =
dvSafeWriter.writeOnly().withRowLineage().write(data)
// Step5: convert the deleted files that need to be written to commit
message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 6b170a39b4..24d94bc56c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -107,7 +107,7 @@ case class MergeIntoPaimonTable(
createNewScanPlan(candidateDataSplits, relation, targetOnlyCondition))
val ds = constructChangedRows(
sparkSession,
- selectWithDvMetaCols(filteredDf),
+ selectWithDvMeta(filteredDf),
remainDeletedRow = true,
extraMetadataCols = dvMetaCols)
@@ -176,7 +176,7 @@ case class MergeIntoPaimonTable(
var filesToRewrittenDS =
createDataset(sparkSession,
filesToRewrittenScan).withColumn(FILE_TOUCHED_COL, lit(true))
if (writeRowLineage) {
- filesToRewrittenDS = selectWithRowLineageMetaCols(filesToRewrittenDS)
+ filesToRewrittenDS = selectWithRowLineage(filesToRewrittenDS)
}
var filesToReadDS =
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
index 437a563ca8..5f8e260254 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -51,11 +51,11 @@ trait ScanPlanHelper extends SQLConfHelper {
}
}
- def selectWithDvMetaCols(data: DataFrame): DataFrame = {
+ def selectWithDvMeta(data: DataFrame): DataFrame = {
selectWithAdditionalCols(data, DV_META_COLUMNS)
}
- def selectWithRowLineageMetaCols(data: DataFrame): DataFrame = {
+ def selectWithRowLineage(data: DataFrame): DataFrame = {
selectWithAdditionalCols(data, ROW_LINEAGE_META_COLUMNS)
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index d82f343056..1753cfb654 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -64,6 +64,19 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
}
}
+ test("Row Lineage: delete table") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
+ sql("DELETE FROM t WHERE id = 2")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1))
+ )
+ }
+ }
+
test("Row Lineage: update table") {
withTable("t") {
sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
index 007b95e10b..dcbf12e273 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
@@ -39,16 +39,16 @@ class ScanPlanHelperTest extends PaimonSparkTestBase with
ScanPlanHelper {
checkAnswer(newDf, Seq(Row(11, "a"), Row(22, "b")))
// select df with row lineage meta cols
- checkAnswer(selectWithRowLineageMetaCols(newDf), Seq(Row(11, "a", 0, 1),
Row(22, "b", 1, 1)))
+ checkAnswer(selectWithRowLineage(newDf), Seq(Row(11, "a", 0, 1), Row(22,
"b", 1, 1)))
// select with row lineage meta cols twice should not add new more meta
cols
checkAnswer(
- selectWithRowLineageMetaCols(selectWithRowLineageMetaCols(newDf)),
+ selectWithRowLineage(selectWithRowLineage(newDf)),
Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1)))
// select df already contains meta cols with row lineage
checkAnswer(
- selectWithRowLineageMetaCols(newDf.select("_ROW_ID", "id")),
+ selectWithRowLineage(newDf.select("_ROW_ID", "id")),
Seq(Row(0, 11, 1), Row(1, 22, 1)))
}
}