This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bbaf5e1a3 [spark] Merge into clause should not write 'null' row id to 
file (#6925)
7bbaf5e1a3 is described below

commit 7bbaf5e1a32db495f09511a3084f25f73f5cd24d
Author: YeJunHao <[email protected]>
AuthorDate: Mon Dec 29 20:27:27 2025 +0800

    [spark] Merge into clause should not write 'null' row id to file (#6925)
---
 .../java/org/apache/paimon/table/SpecialFields.java  | 10 ++++++++--
 .../paimon/operation/BaseAppendFileStoreWrite.java   |  5 ++---
 .../apache/paimon/operation/FileStoreCommitImpl.java |  7 ++++++-
 .../paimon/spark/commands/MergeIntoPaimonTable.scala | 16 +++++++++++-----
 .../paimon/spark/commands/PaimonSparkWriter.scala    |  2 +-
 .../paimon/spark/sql/RowTrackingTestBase.scala       | 20 ++++++++++++++++++--
 6 files changed, 46 insertions(+), 14 deletions(-)

diff --git 
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java 
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index 182682637e..b994706a3b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -143,13 +143,18 @@ public class SpecialFields {
         return rowTypeWithRowTracking(rowType, false);
     }
 
+    public static RowType rowTypeWithRowTracking(RowType rowType, boolean 
sequenceNumberNullable) {
+        return rowTypeWithRowTracking(rowType, true, sequenceNumberNullable);
+    }
+
     /**
      * Add row tracking fields to rowType.
      *
      * @param sequenceNumberNullable sequence number is not null for user, but 
is nullable when read
      *     and write
      */
-    public static RowType rowTypeWithRowTracking(RowType rowType, boolean 
sequenceNumberNullable) {
+    public static RowType rowTypeWithRowTracking(
+            RowType rowType, boolean rowIdNullable, boolean 
sequenceNumberNullable) {
         List<DataField> fieldsWithRowTracking = new 
ArrayList<>(rowType.getFields());
 
         fieldsWithRowTracking.forEach(
@@ -161,7 +166,8 @@ public class SpecialFields {
                                         + "' conflicts with existing field 
names.");
                     }
                 });
-        fieldsWithRowTracking.add(SpecialFields.ROW_ID);
+        fieldsWithRowTracking.add(
+                rowIdNullable ? SpecialFields.ROW_ID : 
SpecialFields.ROW_ID.copy(false));
         fieldsWithRowTracking.add(
                 sequenceNumberNullable
                         ? SpecialFields.SEQUENCE_NUMBER.copy(true)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 08f9dd120f..6dee947308 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -151,9 +151,8 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         List<String> fullNames = rowType.getFieldNames();
         this.writeCols = writeType.getFieldNames();
         // optimize writeCols to null in following cases:
-        // 1. writeType contains all columns
-        // 2. writeType contains all columns and append _ROW_ID cols
-        if (writeCols.size() >= fullCount && writeCols.subList(0, 
fullCount).equals(fullNames)) {
+        // writeType contains all columns (without _ROW_ID and 
_SEQUENCE_NUMBER)
+        if (writeCols.equals(fullNames)) {
             writeCols = null;
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 1355bb4a3a..b446078932 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -62,6 +62,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -1145,8 +1146,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             checkArgument(
                     entry.file().fileSource().isPresent(),
                     "This is a bug, file source field for row-tracking table 
must present.");
+            boolean containsRowId =
+                    entry.file().writeCols() != null
+                            && 
entry.file().writeCols().contains(SpecialFields.ROW_ID.name());
             if (entry.file().fileSource().get().equals(FileSource.APPEND)
-                    && entry.file().firstRowId() == null) {
+                    && entry.file().firstRowId() == null
+                    && !containsRowId) {
                 if (isBlobFile(entry.file().fileName())) {
                     if (blobStart >= start) {
                         throw new IllegalStateException(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index d956a9472f..491e980d0b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -190,14 +190,20 @@ case class MergeIntoPaimonTable(
         filesToRewrittenDS.union(filesToReadDS),
         writeRowTracking = writeRowTracking).drop(ROW_KIND_COL)
 
-      val finalWriter = if (writeRowTracking) {
-        writer.withRowTracking()
+      val rowTrackingNotNull = col(ROW_ID_COLUMN).isNotNull
+      val rowTrackingNull = col(ROW_ID_COLUMN).isNull
+      val addCommitMessageBuilder = Seq.newBuilder[CommitMessage]
+      if (writeRowTracking) {
+        val rowTrackingWriter = writer.withRowTracking()
+        addCommitMessageBuilder ++= 
rowTrackingWriter.write(toWriteDS.filter(rowTrackingNotNull))
+        addCommitMessageBuilder ++= writer.write(
+          toWriteDS.filter(rowTrackingNull).drop(ROW_ID_COLUMN, 
SEQUENCE_NUMBER_COLUMN))
       } else {
-        writer
+        addCommitMessageBuilder ++= writer.write(toWriteDS)
       }
-      val addCommitMessage = finalWriter.write(toWriteDS)
-      val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
 
+      val addCommitMessage = addCommitMessageBuilder.result()
+      val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
       addCommitMessage ++ deletedCommitMessage
     }
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ca5608ba0e..907fe62348 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -70,7 +70,7 @@ case class PaimonSparkWriter(
 
   private val writeType = {
     if (writeRowTracking) {
-      SpecialFields.rowTypeWithRowTracking(table.rowType(), true)
+      SpecialFields.rowTypeWithRowTracking(table.rowType(), false, true)
     } else {
       table.rowType()
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 204ca7fcd4..e0bda544ab 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -145,6 +145,22 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Row Tracking: update") {
+    withTable("s", "t") {
+      spark.sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      spark.sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data 
FROM range(1, 4)")
+
+      spark.sql("UPDATE t SET data = 22 WHERE id = 2")
+
+      spark.sql("INSERT INTO t VALUES (4, 4), (5, 5)")
+
+      checkAnswer(
+        spark.sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+        Seq(Row(1, 1, 0, 1), Row(2, 22, 1, 2), Row(3, 3, 2, 1), Row(4, 4, 3, 
3), Row(5, 5, 4, 3))
+      )
+    }
+  }
+
   test("Row Tracking: merge into table") {
     withTable("s", "t") {
       sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
@@ -557,8 +573,8 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
             Row(2, 200, "c2", 1, 2),
             Row(3, 300, "c33", 2, 2),
             Row(5, 550, "c5", 4, 2),
-            Row(7, 700, "c77", 9, 2),
-            Row(9, 990, "c99", 10, 2))
+            Row(7, 700, "c77", 5, 2),
+            Row(9, 990, "c99", 6, 2))
         )
       }
     }

Reply via email to