This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 42f12a20b5 [spark] keep the rowId unchanged when updating the row
tracking and deletion vectors table. (#6756)
42f12a20b5 is described below
commit 42f12a20b5c011307d153572306814f112101ed7
Author: Xiao Zhu <[email protected]>
AuthorDate: Sat Dec 6 21:58:56 2025 +0800
[spark] keep the rowId unchanged when updating the row tracking and
deletion vectors table. (#6756)
---
.../spark/commands/UpdatePaimonTableCommand.scala | 35 +++++------
.../paimon/spark/sql/RowTrackingTestBase.scala | 72 ++++++++++++++--------
2 files changed, 63 insertions(+), 44 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 5d89b3d2f1..32565c1219 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -26,7 +26,7 @@ import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.RowKind
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If,
Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
@@ -127,32 +127,26 @@ case class UpdatePaimonTableCommand(
private def writeOnlyUpdatedData(
sparkSession: SparkSession,
touchedDataSplits: Array[DataSplit]): Seq[CommitMessage] = {
- val updateColumns = updateExpressions.zip(relation.output).map {
- case (update, origin) =>
- toColumn(update).as(origin.name, origin.metadata)
- }
+ val updateColumns = getUpdateColumns(sparkSession)
val toUpdateScanRelation = createNewScanPlan(touchedDataSplits, relation,
Some(condition))
val data = createDataset(sparkSession,
toUpdateScanRelation).select(updateColumns: _*)
- writer.write(data)
+ writer.withRowTracking().write(data)
}
private def writeUpdatedAndUnchangedData(
sparkSession: SparkSession,
toUpdateScanRelation: LogicalPlan): Seq[CommitMessage] = {
+ val updateColumns = getUpdateColumns(sparkSession)
- def rowIdCol = col(ROW_ID_COLUMN)
-
- def sequenceNumberCol = toColumn(
- optimizedIf(
- condition,
- Literal(null),
- toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
- .as(SEQUENCE_NUMBER_COLUMN)
+ val data = createDataset(sparkSession,
toUpdateScanRelation).select(updateColumns: _*)
+ writer.withRowTracking().write(data)
+ }
+ private def getUpdateColumns(sparkSession: SparkSession): Seq[Column] = {
var updateColumns = updateExpressions.zip(relation.output).map {
case (_, origin) if origin.name == ROW_ID_COLUMN => rowIdCol
- case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN =>
sequenceNumberCol
+ case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN =>
sequenceNumberCol(sparkSession)
case (update, origin) =>
val updated = optimizedIf(condition, update, origin)
toColumn(updated).as(origin.name, origin.metadata)
@@ -164,14 +158,19 @@ case class UpdatePaimonTableCommand(
updateColumns ++= Seq(rowIdCol)
}
if (!outputSet.exists(_.name == SEQUENCE_NUMBER_COLUMN)) {
- updateColumns ++= Seq(sequenceNumberCol)
+ updateColumns ++= Seq(sequenceNumberCol(sparkSession))
}
}
- val data = createDataset(sparkSession,
toUpdateScanRelation).select(updateColumns: _*)
- writer.withRowTracking().write(data)
+ updateColumns
}
+ private def rowIdCol = col(ROW_ID_COLUMN)
+
+ private def sequenceNumberCol(sparkSession: SparkSession) = toColumn(
+ optimizedIf(condition, Literal(null), toExpression(sparkSession,
col(SEQUENCE_NUMBER_COLUMN))))
+ .as(SEQUENCE_NUMBER_COLUMN)
+
private def optimizedIf(
predicate: Expression,
trueValue: Expression,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index aab69e9ea1..611a936b6f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -73,43 +73,63 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
test("Row Tracking: delete table") {
withTable("t") {
+ // only enable row tracking
sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ runAndCheckAnswer()
+ sql("DROP TABLE t")
- sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
- sql("DELETE FROM t WHERE id = 2")
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1))
- )
- sql("DELETE FROM t WHERE _ROW_ID = 2")
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 1, 0, 1))
- )
+ // enable row tracking and deletion vectors
+ sql(
+ "CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')")
+ runAndCheckAnswer()
+
+ def runAndCheckAnswer(): Unit = {
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
+ sql("DELETE FROM t WHERE id = 2")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1), Row(3, 3, 2, 1))
+ )
+ sql("DELETE FROM t WHERE _ROW_ID = 2")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1))
+ )
+ }
}
}
test("Row Tracking: update table") {
withTable("t") {
+ // only enable row tracking
sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ runAndCheckAnswer()
+ sql("DROP TABLE t")
- sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 1), Row(3, 3, 2, 1))
- )
+ // enable row tracking and deletion vectors
+ sql(
+ "CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'deletion-vectors.enabled' = 'true')")
+ runAndCheckAnswer()
- sql("UPDATE t SET data = 22 WHERE id = 2")
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1))
- )
+ def runAndCheckAnswer(): Unit = {
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 1), Row(3, 3, 2, 1))
+ )
- sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1")
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1))
- )
+ sql("UPDATE t SET data = 22 WHERE id = 2")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1))
+ )
+
+ sql("UPDATE t SET data = 222 WHERE _ROW_ID = 1")
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1))
+ )
+ }
}
}