This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f32f02d3c8 [spark] Support MERGE for row lineage (#6045)
f32f02d3c8 is described below
commit f32f02d3c8c418b4a3e912446a22bd3381c41f47
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Aug 10 21:37:43 2025 +0800
[spark] Support MERGE for row lineage (#6045)
---
docs/content/append-table/row-tracking.md | 27 +++++-
.../spark/commands/MergeIntoPaimonTable.scala | 106 +++++++++++++--------
paimon-spark/paimon-spark-ut/pom.xml | 17 +++-
.../paimon/spark/sql/RowLineageTestBase.scala | 47 +++++++++
4 files changed, 151 insertions(+), 46 deletions(-)
diff --git a/docs/content/append-table/row-tracking.md
b/docs/content/append-table/row-tracking.md
index 3c9fe75cf5..16c9d76bcb 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -46,7 +46,7 @@ WITH ('row-tracking.enabled' = 'true');
```
Notice that:
- Row tracking is only supported for unaware append tables, not for primary
key tables. Which means you can't define `bucket` and `bucket-key` for the
table.
-- Only spark support update and merge into (not yet) operations on
row-tracking tables, Flink SQL does not support these operations yet.
+- Only spark support update and merge into operations on row-tracking tables,
Flink SQL does not support these operations yet.
- This function is experimental, this line will be removed after being stable.
## How to use row tracking
@@ -62,7 +62,7 @@ You can select the row lineage meta column with the following
sql in spark:
SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
```
You will get the following result:
-```sql
+```text
+---+----+-------+----------------+
| id|data|_ROW_ID|_SEQUENCE_NUMBER|
+---+----+-------+----------------+
@@ -78,13 +78,32 @@ SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
```
You will get:
-```sql
+```text
+---+---------------+-------+----------------+
| id| data|_ROW_ID|_SEQUENCE_NUMBER|
+---+---------------+-------+----------------+
| 22| b| 1| 1|
| 11|new-data-update| 0| 2|
- +---+---------------+-------+----------------+
++---+---------------+-------+----------------+
+```
+
+You can also merge into the table, suppose you have a source table `s` that
contains (22, 'new-data-merge') and (33, 'c'):
+```sql
+MERGE INTO t USING s
+ON t.id = s.id
+WHEN MATCHED THEN UPDATE SET t.data = s.data
+WHEN NOT MATCHED THEN INSERT *
+```
+
+You will get:
+```text
++---+---------------+-------+----------------+
+| id| data|_ROW_ID|_SEQUENCE_NUMBER|
++---+---------------+-------+----------------+
+| 11|new-data-update| 0| 2|
+| 22| new-data-merge| 1| 3|
+| 33| c| 2| 3|
++---+---------------+-------+----------------+
```
## Spec
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 73a1e681ca..d54639310b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -24,7 +24,7 @@ import
org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.{PaimonMetadataColumn,
SparkSystemColumns}
import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH,
FILE_PATH_COLUMN, ROW_INDEX, ROW_INDEX_COLUMN}
import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.{FileStoreTable, SpecialFields}
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.types.RowKind
@@ -114,14 +114,10 @@ case class MergeIntoPaimonTable(
sparkSession,
filteredRelation,
remainDeletedRow = true,
- metadataCols = metadataCols)
+ extraMetadataCols = metadataCols)
ds.cache()
try {
- val rowKindAttribute = ds.queryExecution.analyzed.output
- .find(attr => sparkSession.sessionState.conf.resolver(attr.name,
ROW_KIND_COL))
- .getOrElse(throw new RuntimeException("Can not find _row_kind_
column."))
-
// Step3: filter rows that should be marked as DELETED in Deletion
Vector mode.
val dvDS = ds.where(
s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL =
${RowKind.UPDATE_AFTER.toByteValue}")
@@ -141,8 +137,10 @@ case class MergeIntoPaimonTable(
ds.unpersist()
}
} else {
- val touchedFilePathsSet = mutable.Set.empty[String]
- val intersectionFilePaths = mutable.Set.empty[String]
+ // Files need to be rewritten
+ val filePathsToRewritten = mutable.Set.empty[String]
+ // Files need to be read, but not rewritten
+ val filePathsToRead = mutable.Set.empty[String]
def hasUpdate(actions: Seq[MergeAction]): Boolean = {
actions.exists {
@@ -159,39 +157,44 @@ case class MergeIntoPaimonTable(
}
if (hasUpdate(matchedActions)) {
- touchedFilePathsSet ++= findTouchedFiles0("inner")
+ filePathsToRewritten ++= findTouchedFiles0("inner")
} else if (notMatchedActions.nonEmpty) {
- intersectionFilePaths ++= findTouchedFiles0("inner")
+ filePathsToRead ++= findTouchedFiles0("inner")
}
if (hasUpdate(notMatchedBySourceActions)) {
- touchedFilePathsSet ++= findTouchedFiles0("left_anti")
- }
-
- val touchedFilePaths: Array[String] = touchedFilePathsSet.toArray
- val unTouchedFilePaths = if (notMatchedActions.nonEmpty) {
- intersectionFilePaths.diff(touchedFilePathsSet).toArray
- } else {
- Array[String]()
+ val noMatchedBySourceFilePaths = findTouchedFiles0("left_anti")
+ filePathsToRewritten ++= noMatchedBySourceFilePaths
+ filePathsToRead --= noMatchedBySourceFilePaths
}
- val (touchedFiles, touchedFileRelation) =
- createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
+ val (filesToRewritten, touchedFileRelation) =
+ createNewRelation(filePathsToRewritten.toArray, dataFilePathToMeta,
relation)
val (_, unTouchedFileRelation) =
- createNewRelation(unTouchedFilePaths, dataFilePathToMeta, relation)
+ createNewRelation(filePathsToRead.toArray, dataFilePathToMeta,
relation)
// Add FILE_TOUCHED_COL to mark the row as coming from the touched file,
if the row has not been
// modified and was from touched file, it should be kept too.
- val touchedDsWithFileTouchedCol = createDataset(sparkSession,
touchedFileRelation)
+ val targetDSWithFileTouchedCol = createDataset(sparkSession,
touchedFileRelation)
.withColumn(FILE_TOUCHED_COL, lit(true))
- val targetDSWithFileTouchedCol = touchedDsWithFileTouchedCol.union(
- createDataset(sparkSession, unTouchedFileRelation)
+ .union(createDataset(sparkSession, unTouchedFileRelation)
.withColumn(FILE_TOUCHED_COL, lit(false)))
- val toWriteDS =
- constructChangedRows(sparkSession,
targetDSWithFileTouchedCol).drop(ROW_KIND_COL)
- val addCommitMessage = dvSafeWriter.write(toWriteDS)
- val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+ // If no files need to be rewritten, no need to write row lineage
+ val writeRowLineage = coreOptions.rowTrackingEnabled() &&
filesToRewritten.nonEmpty
+
+ val toWriteDS = constructChangedRows(
+ sparkSession,
+ targetDSWithFileTouchedCol,
+ writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
+
+ val writer = if (writeRowLineage) {
+ dvSafeWriter.withRowLineage()
+ } else {
+ dvSafeWriter
+ }
+ val addCommitMessage = writer.write(toWriteDS)
+ val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
addCommitMessage ++ deletedCommitMessage
}
@@ -203,7 +206,8 @@ case class MergeIntoPaimonTable(
targetDataset: Dataset[Row],
remainDeletedRow: Boolean = false,
deletionVectorEnabled: Boolean = false,
- metadataCols: Seq[PaimonMetadataColumn] = Seq.empty): Dataset[Row] = {
+ extraMetadataCols: Seq[PaimonMetadataColumn] = Seq.empty,
+ writeRowLineage: Boolean = false): Dataset[Row] = {
val targetDS = targetDataset
.withColumn(TARGET_ROW_COL, lit(true))
@@ -217,7 +221,6 @@ case class MergeIntoPaimonTable(
resolveExpressions(sparkSession)(exprs, joinedPlan)
}
- val targetOutput = filteredTargetPlan.output
val targetRowNotMatched = resolveOnJoinedPlan(
Seq(toExpression(sparkSession, col(SOURCE_ROW_COL).isNull))).head
val sourceRowNotMatched = resolveOnJoinedPlan(
@@ -225,17 +228,35 @@ case class MergeIntoPaimonTable(
val matchedExprs = matchedActions.map(_.condition.getOrElse(TrueLiteral))
val notMatchedExprs =
notMatchedActions.map(_.condition.getOrElse(TrueLiteral))
val notMatchedBySourceExprs =
notMatchedBySourceActions.map(_.condition.getOrElse(TrueLiteral))
- val noopOutput = targetOutput :+ Alias(Literal(NOOP_ROW_KIND_VALUE),
ROW_KIND_COL)()
- val keepOutput = targetOutput :+
Alias(Literal(RowKind.INSERT.toByteValue), ROW_KIND_COL)()
val resolver = sparkSession.sessionState.conf.resolver
- val metadataAttributes = metadataCols.flatMap {
- metadataCol => joinedPlan.output.find(attr => resolver(metadataCol.name,
attr.name))
+ def attribute(name: String) = joinedPlan.output.find(attr =>
resolver(name, attr.name))
+ val extraMetadataAttributes =
+ extraMetadataCols.flatMap(metadataCol => attribute(metadataCol.name))
+ val (rowIdAttr, sequenceNumberAttr) = if (writeRowLineage) {
+ (
+ attribute(SpecialFields.ROW_ID.name()).get,
+ attribute(SpecialFields.SEQUENCE_NUMBER.name()).get)
+ } else {
+ (null, null)
+ }
+
+ val targetOutput = if (writeRowLineage) {
+ filteredTargetPlan.output ++ Seq(rowIdAttr, sequenceNumberAttr)
+ } else {
+ filteredTargetPlan.output
}
+ val noopOutput = targetOutput :+ Alias(Literal(NOOP_ROW_KIND_VALUE),
ROW_KIND_COL)()
+ val keepOutput = targetOutput :+
Alias(Literal(RowKind.INSERT.toByteValue), ROW_KIND_COL)()
+
def processMergeActions(actions: Seq[MergeAction]): Seq[Seq[Expression]] =
{
val columnExprs = actions.map {
case UpdateAction(_, assignments) =>
- assignments.map(_.value) :+ Literal(RowKind.UPDATE_AFTER.toByteValue)
+ var exprs = assignments.map(_.value)
+ if (writeRowLineage) {
+ exprs ++= Seq(rowIdAttr, Literal(null))
+ }
+ exprs :+ Literal(RowKind.UPDATE_AFTER.toByteValue)
case DeleteAction(_) =>
if (remainDeletedRow || deletionVectorEnabled) {
targetOutput :+ Literal(RowKind.DELETE.toByteValue)
@@ -245,17 +266,26 @@ case class MergeIntoPaimonTable(
noopOutput
}
case InsertAction(_, assignments) =>
- assignments.map(_.value) :+ Literal(RowKind.INSERT.toByteValue)
+ var exprs = assignments.map(_.value)
+ if (writeRowLineage) {
+ exprs ++= Seq(rowIdAttr, sequenceNumberAttr)
+ }
+ exprs :+ Literal(RowKind.INSERT.toByteValue)
}
- columnExprs.map(exprs => exprs ++ metadataAttributes)
+
+ columnExprs.map(exprs => exprs ++ extraMetadataAttributes)
}
val matchedOutputs = processMergeActions(matchedActions)
val notMatchedBySourceOutputs =
processMergeActions(notMatchedBySourceActions)
val notMatchedOutputs = processMergeActions(notMatchedActions)
val outputFields = mutable.ArrayBuffer(tableSchema.fields: _*)
+ if (writeRowLineage) {
+ outputFields += PaimonMetadataColumn.ROW_ID.toStructField
+ outputFields += PaimonMetadataColumn.SEQUENCE_NUMBER.toStructField
+ }
outputFields += StructField(ROW_KIND_COL, ByteType)
- outputFields ++= metadataCols.map(_.toStructField)
+ outputFields ++= extraMetadataCols.map(_.toStructField)
val outputSchema = StructType(outputFields.toSeq)
val joinedRowEncoder = EncoderUtils.encode(joinedPlan.schema)
diff --git a/paimon-spark/paimon-spark-ut/pom.xml
b/paimon-spark/paimon-spark-ut/pom.xml
index 3fb7070a81..d1ceafae23 100644
--- a/paimon-spark/paimon-spark-ut/pom.xml
+++ b/paimon-spark/paimon-spark-ut/pom.xml
@@ -40,17 +40,19 @@ under the License.
<dependencies>
<dependency>
- <groupId>org.codehaus.janino</groupId>
- <artifactId>janino</artifactId>
- <version>${janino.test.version}</version>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
- <artifactId>paimon-common</artifactId>
+ <artifactId>paimon-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>${paimon-sparkx-common}</artifactId>
@@ -218,6 +220,13 @@ under the License.
</exclusions>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>${janino.test.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index 6f76a95fdb..113f6e3ad3 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -81,4 +81,51 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
)
}
}
+
+ test("Row Lineage: merge into table") {
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+ sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM
range(2, 4)")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(2, 2, 0, 1), Row(3, 3, 1, 1))
+ )
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET t.b = s.b
+ |WHEN NOT MATCHED THEN INSERT *
+ |""".stripMargin)
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 11, 2, 2), Row(2, 22, 0, 2), Row(3, 3, 1, 1))
+ )
+ }
+ }
+
+ test("Row Lineage: merge into table with only insert") {
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+ sql("CREATE TABLE t (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b FROM
range(2, 4)")
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN NOT MATCHED THEN INSERT *
+ |""".stripMargin)
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 11, 2, 2), Row(2, 2, 0, 1), Row(3, 3, 1, 1))
+ )
+ }
+ }
}