This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7bbaf5e1a3 [spark] Merge into clause should not write 'null' row id to
file (#6925)
7bbaf5e1a3 is described below
commit 7bbaf5e1a32db495f09511a3084f25f73f5cd24d
Author: YeJunHao <[email protected]>
AuthorDate: Mon Dec 29 20:27:27 2025 +0800
[spark] Merge into clause should not write 'null' row id to file (#6925)
---
.../java/org/apache/paimon/table/SpecialFields.java | 10 ++++++++--
.../paimon/operation/BaseAppendFileStoreWrite.java | 5 ++---
.../apache/paimon/operation/FileStoreCommitImpl.java | 7 ++++++-
.../paimon/spark/commands/MergeIntoPaimonTable.scala | 16 +++++++++++-----
.../paimon/spark/commands/PaimonSparkWriter.scala | 2 +-
.../paimon/spark/sql/RowTrackingTestBase.scala | 20 ++++++++++++++++++--
6 files changed, 46 insertions(+), 14 deletions(-)
diff --git
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index 182682637e..b994706a3b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -143,13 +143,18 @@ public class SpecialFields {
return rowTypeWithRowTracking(rowType, false);
}
+ public static RowType rowTypeWithRowTracking(RowType rowType, boolean
sequenceNumberNullable) {
+ return rowTypeWithRowTracking(rowType, true, sequenceNumberNullable);
+ }
+
/**
* Add row tracking fields to rowType.
*
* @param sequenceNumberNullable sequence number is not null for user, but
is nullable when read
* and write
*/
- public static RowType rowTypeWithRowTracking(RowType rowType, boolean
sequenceNumberNullable) {
+ public static RowType rowTypeWithRowTracking(
+ RowType rowType, boolean rowIdNullable, boolean
sequenceNumberNullable) {
List<DataField> fieldsWithRowTracking = new
ArrayList<>(rowType.getFields());
fieldsWithRowTracking.forEach(
@@ -161,7 +166,8 @@ public class SpecialFields {
+ "' conflicts with existing field
names.");
}
});
- fieldsWithRowTracking.add(SpecialFields.ROW_ID);
+ fieldsWithRowTracking.add(
+ rowIdNullable ? SpecialFields.ROW_ID :
SpecialFields.ROW_ID.copy(false));
fieldsWithRowTracking.add(
sequenceNumberNullable
? SpecialFields.SEQUENCE_NUMBER.copy(true)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 08f9dd120f..6dee947308 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -151,9 +151,8 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
List<String> fullNames = rowType.getFieldNames();
this.writeCols = writeType.getFieldNames();
// optimize writeCols to null in following cases:
- // 1. writeType contains all columns
- // 2. writeType contains all columns and append _ROW_ID cols
- if (writeCols.size() >= fullCount && writeCols.subList(0,
fullCount).equals(fullNames)) {
+ // writeType contains all columns (without _ROW_ID and
_SEQUENCE_NUMBER)
+ if (writeCols.equals(fullNames)) {
writeCols = null;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 1355bb4a3a..b446078932 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -62,6 +62,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -1145,8 +1146,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
checkArgument(
entry.file().fileSource().isPresent(),
"This is a bug, file source field for row-tracking table
must present.");
+ boolean containsRowId =
+ entry.file().writeCols() != null
+ &&
entry.file().writeCols().contains(SpecialFields.ROW_ID.name());
if (entry.file().fileSource().get().equals(FileSource.APPEND)
- && entry.file().firstRowId() == null) {
+ && entry.file().firstRowId() == null
+ && !containsRowId) {
if (isBlobFile(entry.file().fileName())) {
if (blobStart >= start) {
throw new IllegalStateException(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index d956a9472f..491e980d0b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -190,14 +190,20 @@ case class MergeIntoPaimonTable(
filesToRewrittenDS.union(filesToReadDS),
writeRowTracking = writeRowTracking).drop(ROW_KIND_COL)
- val finalWriter = if (writeRowTracking) {
- writer.withRowTracking()
+ val rowTrackingNotNull = col(ROW_ID_COLUMN).isNotNull
+ val rowTrackingNull = col(ROW_ID_COLUMN).isNull
+ val addCommitMessageBuilder = Seq.newBuilder[CommitMessage]
+ if (writeRowTracking) {
+ val rowTrackingWriter = writer.withRowTracking()
+ addCommitMessageBuilder ++=
rowTrackingWriter.write(toWriteDS.filter(rowTrackingNotNull))
+ addCommitMessageBuilder ++= writer.write(
+ toWriteDS.filter(rowTrackingNull).drop(ROW_ID_COLUMN,
SEQUENCE_NUMBER_COLUMN))
} else {
- writer
+ addCommitMessageBuilder ++= writer.write(toWriteDS)
}
- val addCommitMessage = finalWriter.write(toWriteDS)
- val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
+ val addCommitMessage = addCommitMessageBuilder.result()
+ val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
addCommitMessage ++ deletedCommitMessage
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ca5608ba0e..907fe62348 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -70,7 +70,7 @@ case class PaimonSparkWriter(
private val writeType = {
if (writeRowTracking) {
- SpecialFields.rowTypeWithRowTracking(table.rowType(), true)
+ SpecialFields.rowTypeWithRowTracking(table.rowType(), false, true)
} else {
table.rowType()
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 204ca7fcd4..e0bda544ab 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -145,6 +145,22 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
}
}
+ test("Row Tracking: update") {
+ withTable("s", "t") {
+ spark.sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ spark.sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data
FROM range(1, 4)")
+
+ spark.sql("UPDATE t SET data = 22 WHERE id = 2")
+
+ spark.sql("INSERT INTO t VALUES (4, 4), (5, 5)")
+
+ checkAnswer(
+ spark.sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+ Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1), Row(4, 4, 3,
3), Row(5, 5, 4, 3))
+ )
+ }
+ }
+
test("Row Tracking: merge into table") {
withTable("s", "t") {
sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
@@ -557,8 +573,8 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
Row(2, 200, "c2", 1, 2),
Row(3, 300, "c33", 2, 2),
Row(5, 550, "c5", 4, 2),
- Row(7, 700, "c77", 9, 2),
- Row(9, 990, "c99", 10, 2))
+ Row(7, 700, "c77", 5, 2),
+ Row(9, 990, "c99", 6, 2))
)
}
}