This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9de4a149a2 [core] Row-tracking row should keep their row_id and 
sequence_number in compaction (#5991)
9de4a149a2 is described below

commit 9de4a149a29b17fcb5eeabffe98eeffb0b5415c1
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jul 31 13:31:37 2025 +0800

    [core] Row-tracking row should keep their row_id and sequence_number in 
compaction (#5991)
---
 .../org/apache/paimon/table/SpecialFields.java     | 20 ++++++
 .../org/apache/paimon/AppendOnlyFileStore.java     |  7 +-
 .../paimon/operation/BaseAppendFileStoreWrite.java | 33 ++++++----
 .../paimon/operation/FileStoreCommitImpl.java      |  3 +
 .../paimon/table/system/RowLineageTable.java       |  7 +-
 .../org/apache/paimon/flink/AppendTableITCase.java | 75 ++++++++++++++++++++++
 6 files changed, 127 insertions(+), 18 deletions(-)

diff --git 
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java 
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index 1a594e5d15..68e1fc86b6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -20,8 +20,11 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -135,4 +138,21 @@ public class SpecialFields {
                 + mapFieldId * STRUCTURED_TYPE_FIELD_DEPTH_LIMIT
                 + depth;
     }
+
+    public static RowType rowTypeWithRowLineage(RowType rowType) {
+        List<DataField> fieldsWithRowLineage = new 
ArrayList<>(rowType.getFields());
+
+        fieldsWithRowLineage.forEach(
+                f -> {
+                    if (ROW_ID.name().equals(f.name()) || 
SEQUENCE_NUMBER.name().equals(f.name())) {
+                        throw new IllegalArgumentException(
+                                "Row lineage field name '"
+                                        + f.name()
+                                        + "' conflicts with existing field 
names.");
+                    }
+                });
+        fieldsWithRowLineage.add(SpecialFields.ROW_ID);
+        fieldsWithRowLineage.add(SpecialFields.SEQUENCE_NUMBER);
+        return new RowType(fieldsWithRowLineage);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index c81a44d426..e25ec5319d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
@@ -96,9 +97,13 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                         ? 
DeletionVectorsMaintainer.factory(newIndexFileHandler())
                         : null;
         if (bucketMode() == BucketMode.BUCKET_UNAWARE) {
+            RawFileSplitRead readForCompact = newRead();
+            if (options.rowTrackingEnabled()) {
+                
readForCompact.withReadType(SpecialFields.rowTypeWithRowLineage(rowType));
+            }
             return new AppendFileStoreWrite(
                     fileIO,
-                    newRead(),
+                    readForCompact,
                     schema.id(),
                     rowType,
                     partitionType,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 27e1e21b90..d7b0943188 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -35,6 +35,7 @@ import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.ExceptionUtils;
@@ -43,7 +44,6 @@ import org.apache.paimon.utils.IOExceptionSupplier;
 import org.apache.paimon.utils.LongCounter;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +59,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
 import static org.apache.paimon.format.FileFormat.fileFormat;
+import static 
org.apache.paimon.utils.StatsCollectorFactories.createStatsFactories;
 
 /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
 public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
@@ -67,7 +68,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     private static final Logger LOG = 
LoggerFactory.getLogger(BaseAppendFileStoreWrite.class);
 
     private final FileIO fileIO;
-    private final RawFileSplitRead read;
+    private final RawFileSplitRead readForCompact;
     private final long schemaId;
     private final RowType rowType;
     private final FileFormat fileFormat;
@@ -79,7 +80,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
 
     public BaseAppendFileStoreWrite(
             FileIO fileIO,
-            RawFileSplitRead read,
+            RawFileSplitRead readForCompact,
             long schemaId,
             RowType rowType,
             RowType partitionType,
@@ -91,15 +92,14 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
             String tableName) {
         super(snapshotManager, scan, options, partitionType, null, 
dvMaintainerFactory, tableName);
         this.fileIO = fileIO;
-        this.read = read;
+        this.readForCompact = readForCompact;
         this.schemaId = schemaId;
         this.rowType = rowType;
         this.fileFormat = fileFormat(options);
         this.pathFactory = pathFactory;
 
         this.statsCollectors =
-                StatsCollectorFactories.createStatsFactories(
-                        options.statsMode(), options, rowType.getFieldNames());
+                createStatsFactories(options.statsMode(), options, 
rowType.getFieldNames());
         this.fileIndexOptions = options.indexColumnsOptions();
     }
 
@@ -153,10 +153,17 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         if (toCompact.isEmpty()) {
             return Collections.emptyList();
         }
+        RowType writeSchema =
+                options.rowTrackingEnabled()
+                        ? SpecialFields.rowTypeWithRowLineage(rowType)
+                        : rowType;
         Exception collectedExceptions = null;
         RowDataRollingFileWriter rewriter =
                 createRollingFileWriter(
-                        partition, bucket, new 
LongCounter(toCompact.get(0).minSequenceNumber()));
+                        partition,
+                        bucket,
+                        new LongCounter(toCompact.get(0).minSequenceNumber()),
+                        writeSchema);
         List<IOExceptionSupplier<DeletionVector>> dvFactories = null;
         if (dvFactory != null) {
             dvFactories = new ArrayList<>(toCompact.size());
@@ -182,17 +189,20 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     }
 
     private RowDataRollingFileWriter createRollingFileWriter(
-            BinaryRow partition, int bucket, LongCounter seqNumCounter) {
+            BinaryRow partition, int bucket, LongCounter seqNumCounter, 
RowType writeSchema) {
         return new RowDataRollingFileWriter(
                 fileIO,
                 schemaId,
                 fileFormat,
                 options.targetFileSize(false),
-                rowType,
+                writeSchema,
                 pathFactory.createDataFilePathFactory(partition, bucket),
                 seqNumCounter,
                 options.fileCompression(),
-                statsCollectors,
+                writeSchema.equals(rowType)
+                        ? statsCollectors
+                        : createStatsFactories(
+                                options.statsMode(), options, 
writeSchema.getFieldNames()),
                 fileIndexOptions,
                 FileSource.COMPACT,
                 options.asyncFileWrite(),
@@ -205,7 +215,8 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
             List<DataFileMeta> files,
             @Nullable List<IOExceptionSupplier<DeletionVector>> dvFactories)
             throws IOException {
-        return new RecordReaderIterator<>(read.createReader(partition, bucket, 
files, dvFactories));
+        return new RecordReaderIterator<>(
+                readForCompact.createReader(partition, bucket, files, 
dvFactories));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 132c88d694..54888abe72 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1183,6 +1183,9 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 long rowCount = entry.file().rowCount();
                 rowIdAssigned.add(entry.assignFirstRowId(start));
                 start += rowCount;
+            } else {
+                // for compact file, do not assign first row id.
+                rowIdAssigned.add(entry);
             }
         }
         return start;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
index c428df607f..6be13e8e7e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
@@ -36,7 +36,6 @@ import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.ChangelogManager;
@@ -44,7 +43,6 @@ import org.apache.paimon.utils.SimpleFileReader;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -103,10 +101,7 @@ public class RowLineageTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public RowType rowType() {
-        List<DataField> fields = new 
ArrayList<>(wrapped.rowType().getFields());
-        fields.add(SpecialFields.ROW_ID);
-        fields.add(SpecialFields.SEQUENCE_NUMBER);
-        return new RowType(fields);
+        return SpecialFields.rowTypeWithRowLineage(wrapped.rowType());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
index 4afe48a4a9..75c55c8f88 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
@@ -107,6 +107,81 @@ public class AppendTableITCase extends CatalogITCaseBase {
         assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, 
"BBB"));
     }
 
+    @Test
+    public void testCompactionWithRowLineage() throws Exception {
+        batchSql("ALTER TABLE append_table_lineage SET 
('compaction.max.file-num' = '4')");
+
+        assertExecuteExpected(
+                "INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2, 
'BBB')",
+                1L,
+                Snapshot.CommitKind.APPEND,
+                "append_table_lineage");
+        assertExecuteExpected(
+                "INSERT INTO append_table_lineage VALUES (3, 'CCC'), (4, 
'DDD')",
+                2L,
+                Snapshot.CommitKind.APPEND,
+                "append_table_lineage");
+        assertExecuteExpected(
+                "INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2, 
'BBB'), (3, 'CCC'), (4, 'DDD')",
+                3L,
+                Snapshot.CommitKind.APPEND,
+                "append_table_lineage");
+        assertExecuteExpected(
+                "INSERT INTO append_table_lineage VALUES (5, 'EEE'), (6, 
'FFF')",
+                4L,
+                Snapshot.CommitKind.APPEND,
+                "append_table_lineage");
+        assertExecuteExpected(
+                "INSERT INTO append_table_lineage VALUES (7, 'HHH'), (8, 
'III')",
+                5L,
+                Snapshot.CommitKind.APPEND,
+                "append_table_lineage");
+        assertExecuteExpected(
+                "INSERT INTO append_table_lineage VALUES (9, 'JJJ'), (10, 
'KKK')",
+                6L,
+                Snapshot.CommitKind.APPEND,
+                "append_table_lineage");
+        assertExecuteExpected(
+                "INSERT INTO append_table_lineage VALUES (11, 'LLL'), (12, 
'MMM')",
+                7L,
+                Snapshot.CommitKind.APPEND,
+                "append_table_lineage");
+        assertExecuteExpected(
+                "INSERT INTO append_table_lineage VALUES (13, 'NNN'), (14, 
'OOO')",
+                8L,
+                Snapshot.CommitKind.APPEND,
+                "append_table_lineage");
+
+        List<Row> originRowsWithId2 = batchSql("SELECT * FROM 
append_table_lineage$row_lineage");
+        batchSql("call sys.compact('default.append_table_lineage')");
+        waitCompactSnapshot(60000L, "append_table_lineage");
+        List<Row> files = batchSql("SELECT * FROM append_table_lineage$files");
+        assertThat(files.size()).isEqualTo(1);
+        List<Row> rowsAfter2 = batchSql("SELECT * FROM 
append_table_lineage$row_lineage");
+        
assertThat(originRowsWithId2).containsExactlyInAnyOrderElementsOf(rowsAfter2);
+
+        assertThat(rowsAfter2)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "AAA", 0L, 1L),
+                        Row.of(2, "BBB", 1L, 1L),
+                        Row.of(3, "CCC", 2L, 2L),
+                        Row.of(4, "DDD", 3L, 2L),
+                        Row.of(1, "AAA", 4L, 3L),
+                        Row.of(2, "BBB", 5L, 3L),
+                        Row.of(3, "CCC", 6L, 3L),
+                        Row.of(4, "DDD", 7L, 3L),
+                        Row.of(5, "EEE", 8L, 4L),
+                        Row.of(6, "FFF", 9L, 4L),
+                        Row.of(7, "HHH", 10L, 5L),
+                        Row.of(8, "III", 11L, 5L),
+                        Row.of(9, "JJJ", 12L, 6L),
+                        Row.of(10, "KKK", 13L, 6L),
+                        Row.of(11, "LLL", 14L, 7L),
+                        Row.of(12, "MMM", 15L, 7L),
+                        Row.of(13, "NNN", 16L, 8L),
+                        Row.of(14, "OOO", 17L, 8L));
+    }
+
     @Test
     public void testSkipDedup() {
         batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (1, 'AAA'), (2, 
'BBB'), (3, 'BBB')");

Reply via email to