This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 020ed14809 [core] Row lineage core support. (#5935)
020ed14809 is described below

commit 020ed14809a4f3136100f288ac7c2f458cfbb062
Author: YeJunHao <[email protected]>
AuthorDate: Mon Jul 28 21:57:21 2025 +0800

    [core] Row lineage core support. (#5935)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  10 +
 .../src/main/java/org/apache/paimon/Snapshot.java  |  28 ++-
 .../org/apache/paimon/table/SpecialFields.java     |  10 +-
 .../apache/paimon/casting/FallbackMappingRow.java  | 189 +++++++++++++++++++
 .../paimon/data/columnar/ColumnarRowIterator.java  |  54 ++++++
 .../paimon/casting/FallbackMappingRowTest.java     |  63 +++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |   3 +-
 .../org/apache/paimon/AppendOnlyFileStore.java     |   3 +-
 .../src/main/java/org/apache/paimon/Changelog.java |   9 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |   3 +-
 .../java/org/apache/paimon/io/DataFileMeta.java    | 108 +++++++++--
 .../apache/paimon/io/DataFileMeta08Serializer.java |   1 +
 .../apache/paimon/io/DataFileMeta09Serializer.java |   1 +
 .../paimon/io/DataFileMeta10LegacySerializer.java  |   1 +
 ...er.java => DataFileMeta12LegacySerializer.java} |  13 +-
 .../apache/paimon/io/DataFileMetaSerializer.java   |   6 +-
 .../org/apache/paimon/io/DataFileRecordReader.java |  58 +++++-
 .../apache/paimon/io/KeyValueDataFileWriter.java   |   3 +-
 .../paimon/io/KeyValueFileReaderFactory.java       |   9 +-
 .../org/apache/paimon/io/RowDataFileWriter.java    |   3 +-
 .../org/apache/paimon/manifest/ManifestEntry.java  |  14 ++
 .../org/apache/paimon/migrate/FileMetaUtils.java   |   1 +
 .../paimon/operation/FileStoreCommitImpl.java      |  66 ++++++-
 .../apache/paimon/operation/RawFileSplitRead.java  |  19 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  16 ++
 .../sink/CommitMessageLegacyV2Serializer.java      |   1 +
 .../paimon/table/sink/CommitMessageSerializer.java |  11 +-
 .../org/apache/paimon/table/source/DataSplit.java  |   8 +-
 .../org/apache/paimon/table/system/FilesTable.java |   4 +-
 .../paimon/table/system/RowLineageTable.java       | 201 +++++++++++++++++++++
 .../paimon/table/system/SystemTableLoader.java     |   2 +
 .../src/main/java/org/apache/paimon/tag/Tag.java   |   8 +-
 .../apache/paimon/utils/FormatReaderMapping.java   |  35 +++-
 .../append/AppendCompactCoordinatorTest.java       |   1 +
 .../apache/paimon/append/AppendOnlyWriterTest.java |   1 +
 .../paimon/crosspartition/IndexBootstrapTest.java  |   1 +
 .../paimon/io/DataFileTestDataGenerator.java       |   1 +
 .../org/apache/paimon/io/DataFileTestUtils.java    |   3 +
 ...festCommittableSerializerCompatibilityTest.java |  90 ++++++++-
 .../ManifestCommittableSerializerTest.java         |   1 +
 .../paimon/manifest/ManifestFileMetaTestBase.java  |   2 +
 .../org/apache/paimon/mergetree/LevelsTest.java    |   1 +
 .../mergetree/compact/IntervalPartitionTest.java   |   1 +
 .../mergetree/compact/UniversalCompactionTest.java |  17 +-
 .../paimon/operation/ExpireSnapshotsTest.java      |   4 +-
 .../paimon/table/source/SplitGeneratorTest.java    |   1 +
 .../org/apache/paimon/table/source/SplitTest.java  |  80 +++++++-
 .../org/apache/paimon/tag/TagAutoManagerTest.java  |   2 +
 .../test/java/org/apache/paimon/tag/TagTest.java   |   1 +
 .../paimon/utils/FormatReaderMappingTest.java      |   4 +-
 .../apache/paimon/utils/SnapshotManagerTest.java   |   5 +
 .../src/test/resources/compatibility/datasplit-v7  | Bin 0 -> 984 bytes
 .../compatibility/manifest-committable-v8          | Bin 0 -> 3522 bytes
 .../org/apache/paimon/flink/AppendTableITCase.java |  41 ++++-
 ...endPreCommitCompactCoordinatorOperatorTest.java |   1 +
 .../ChangelogCompactCoordinateOperatorTest.java    |   1 +
 .../ChangelogCompactSortOperatorTest.java          |   1 +
 .../ChangelogCompactTaskSerializerTest.java        |   1 +
 .../changelog/ChangelogCompactTaskTest.java        |   1 +
 .../sink/CompactionTaskSimpleSerializerTest.java   |   1 +
 .../source/FileStoreSourceSplitGeneratorTest.java  |   1 +
 .../source/FileStoreSourceSplitSerializerTest.java |   1 +
 .../flink/source/TestChangelogDataReadWrite.java   |   3 +-
 .../org/apache/paimon/spark/ScanHelperTest.scala   |   2 +
 65 files changed, 1163 insertions(+), 74 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index e079a253e1..77237c198e 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -875,6 +875,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>String</td>
             <td>Time field for record level expire. It supports the following 
types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`, 
`timestamps in milliseconds with BIGINT` or `timestamp`.</td>
         </tr>
+        <tr>
+            <td><h5>row-tracking.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether enable unique row id for append table.</td>
+        </tr>
         <tr>
             <td><h5>rowkind.field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 64ea11a835..56f6daf40b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1912,6 +1912,12 @@ public class CoreOptions implements Serializable {
                                     + "in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, "
                                     + "and 'hilbert' for 5 or more columns.");
 
+    public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
+            key("row-tracking.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether enable unique row id for append 
table.");
+
     public static final ConfigOption<Boolean> SNAPSHOT_IGNORE_EMPTY_COMMIT =
             key("snapshot.ignore-empty-commit")
                     .booleanType()
@@ -2864,6 +2870,10 @@ public class CoreOptions implements Serializable {
         return options.get(RECORD_LEVEL_TIME_FIELD);
     }
 
+    public boolean rowTrackingEnabled() {
+        return options.get(ROW_TRACKING_ENABLED);
+    }
+
     public boolean prepareCommitWaitCompaction() {
         if (!needLookup()) {
             return false;
diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
index 23d73b285d..31defa3bfb 100644
--- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
@@ -88,6 +88,7 @@ public class Snapshot implements Serializable {
     protected static final String FIELD_WATERMARK = "watermark";
     protected static final String FIELD_STATISTICS = "statistics";
     protected static final String FIELD_PROPERTIES = "properties";
+    protected static final String FIELD_NEXT_ROW_ID = "nextRowId";
 
     // version of snapshot
     // null for paimon <= 0.2
@@ -203,6 +204,11 @@ public class Snapshot implements Serializable {
     @Nullable
     protected final Map<String, String> properties;
 
+    @Nullable
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NEXT_ROW_ID)
+    protected final Long nextRowId;
+
     public Snapshot(
             long id,
             long schemaId,
@@ -223,7 +229,8 @@ public class Snapshot implements Serializable {
             @Nullable Long changelogRecordCount,
             @Nullable Long watermark,
             @Nullable String statistics,
-            @Nullable Map<String, String> properties) {
+            @Nullable Map<String, String> properties,
+            @Nullable Long nextRowId) {
         this(
                 CURRENT_VERSION,
                 id,
@@ -245,7 +252,8 @@ public class Snapshot implements Serializable {
                 changelogRecordCount,
                 watermark,
                 statistics,
-                properties);
+                properties,
+                nextRowId);
     }
 
     @JsonCreator
@@ -271,7 +279,8 @@ public class Snapshot implements Serializable {
             @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long 
changelogRecordCount,
             @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
             @JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
-            @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> 
properties) {
+            @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> 
properties,
+            @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) {
         this.version = version;
         this.id = id;
         this.schemaId = schemaId;
@@ -293,6 +302,7 @@ public class Snapshot implements Serializable {
         this.watermark = watermark;
         this.statistics = statistics;
         this.properties = properties;
+        this.nextRowId = nextRowId;
     }
 
     @JsonGetter(FIELD_VERSION)
@@ -413,6 +423,12 @@ public class Snapshot implements Serializable {
         return properties;
     }
 
+    @JsonGetter(FIELD_NEXT_ROW_ID)
+    @Nullable
+    public Long nextRowId() {
+        return nextRowId;
+    }
+
     public String toJson() {
         return JsonSerdeUtil.toJson(this);
     }
@@ -440,7 +456,8 @@ public class Snapshot implements Serializable {
                 changelogRecordCount,
                 watermark,
                 statistics,
-                properties);
+                properties,
+                nextRowId);
     }
 
     @Override
@@ -472,7 +489,8 @@ public class Snapshot implements Serializable {
                 && Objects.equals(changelogRecordCount, 
that.changelogRecordCount)
                 && Objects.equals(watermark, that.watermark)
                 && Objects.equals(statistics, that.statistics)
-                && Objects.equals(properties, that.properties);
+                && Objects.equals(properties, that.properties)
+                && Objects.equals(nextRowId, that.nextRowId);
     }
 
     /** Type of changes in this snapshot. */
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java 
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index 3288276a1f..1a594e5d15 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -87,8 +87,16 @@ public class SpecialFields {
             new DataField(
                     Integer.MAX_VALUE - 4, "rowkind", new 
VarCharType(VarCharType.MAX_LENGTH));
 
+    public static final DataField ROW_ID =
+            new DataField(Integer.MAX_VALUE - 5, "_ROW_ID", 
DataTypes.BIGINT());
+
     public static final Set<String> SYSTEM_FIELD_NAMES =
-            Stream.of(SEQUENCE_NUMBER.name(), VALUE_KIND.name(), LEVEL.name(), 
ROW_KIND.name())
+            Stream.of(
+                            SEQUENCE_NUMBER.name(),
+                            VALUE_KIND.name(),
+                            LEVEL.name(),
+                            ROW_KIND.name(),
+                            ROW_ID.name())
                     .collect(Collectors.toSet());
 
     public static boolean isSystemField(int fieldId) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
new file mode 100644
index 0000000000..2775c3ebfe
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+/** Row with row lineage inject in. */
+public class FallbackMappingRow implements InternalRow {
+
+    private InternalRow main;
+    private InternalRow fallbackRow;
+    private final int[] mappings;
+
+    public FallbackMappingRow(int[] mappings) {
+        this.mappings = mappings;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return main.getFieldCount();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return main.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        main.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        if (mappings[pos] == -1) {
+            return main.isNullAt(pos);
+        }
+        return main.isNullAt(pos) || fallbackRow.isNullAt(mappings[pos]);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getBoolean(mappings[pos]);
+        }
+        return main.getBoolean(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getByte(mappings[pos]);
+        }
+        return main.getByte(pos);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getShort(mappings[pos]);
+        }
+        return main.getShort(pos);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getInt(mappings[pos]);
+        }
+        return main.getInt(pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getLong(mappings[pos]);
+        }
+        return main.getLong(pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getFloat(mappings[pos]);
+        }
+        return main.getFloat(pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getDouble(mappings[pos]);
+        }
+        return main.getDouble(pos);
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getString(mappings[pos]);
+        }
+        return main.getString(pos);
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getDecimal(mappings[pos], precision, scale);
+        }
+        return main.getDecimal(pos, precision, scale);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getTimestamp(mappings[pos], precision);
+        }
+        return main.getTimestamp(pos, precision);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getBinary(mappings[pos]);
+        }
+        return main.getBinary(pos);
+    }
+
+    @Override
+    public Variant getVariant(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getVariant(mappings[pos]);
+        }
+        return main.getVariant(pos);
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getArray(mappings[pos]);
+        }
+        return main.getArray(pos);
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getMap(mappings[pos]);
+        }
+        return main.getMap(pos);
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        if (mappings[pos] != -1 && main.isNullAt(pos)) {
+            return fallbackRow.getRow(mappings[pos], numFields);
+        }
+        return main.getRow(pos, numFields);
+    }
+
+    public FallbackMappingRow replace(InternalRow main, InternalRow 
rowLineage) {
+        this.main = main;
+        this.fallbackRow = rowLineage;
+        return this;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 02bfe5912d..37f8d8edf2 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -23,12 +23,15 @@ import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.utils.LongIterator;
 import org.apache.paimon.utils.RecyclableIterator;
 import org.apache.paimon.utils.VectorMappingUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -120,4 +123,55 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
         }
         return this;
     }
+
+    public ColumnarRowIterator assignRowLineage(
+            Long firstRowId, Long snapshotId, Map<String, Integer> meta) {
+        VectorizedColumnBatch vectorizedColumnBatch = row.batch();
+        ColumnVector[] vectors = vectorizedColumnBatch.columns;
+
+        if (meta.containsKey(SpecialFields.ROW_ID.name())) {
+            Integer index = meta.get(SpecialFields.ROW_ID.name());
+            final ColumnVector rowIdVector = vectors[index];
+            vectors[index] =
+                    new LongColumnVector() {
+                        @Override
+                        public long getLong(int i) {
+                            if (rowIdVector.isNullAt(i)) {
+                                return firstRowId + returnedPosition();
+                            } else {
+                                return ((LongColumnVector) 
rowIdVector).getLong(i);
+                            }
+                        }
+
+                        @Override
+                        public boolean isNullAt(int i) {
+                            return false;
+                        }
+                    };
+        }
+
+        if (meta.containsKey(SpecialFields.SEQUENCE_NUMBER.name())) {
+            Integer index = meta.get(SpecialFields.SEQUENCE_NUMBER.name());
+            final ColumnVector versionVector = vectors[index];
+            vectors[index] =
+                    new LongColumnVector() {
+                        @Override
+                        public long getLong(int i) {
+                            if (versionVector.isNullAt(i)) {
+                                return snapshotId;
+                            } else {
+                                return ((LongColumnVector) 
versionVector).getLong(i);
+                            }
+                        }
+
+                        @Override
+                        public boolean isNullAt(int i) {
+                            return false;
+                        }
+                    };
+        }
+
+        copy(vectors);
+        return this;
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/casting/FallbackMappingRowTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/casting/FallbackMappingRowTest.java
new file mode 100644
index 0000000000..cd5c293f6c
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/casting/FallbackMappingRowTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.GenericRow;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.paimon.types.DataTypesTest.assertThat;
+
+/** Test for {@link FallbackMappingRow}. */
+public class FallbackMappingRowTest {
+
+    @Test
+    public void testBasic() {
+        int[] project = new int[7];
+
+        GenericRow genericRow = new GenericRow(2);
+        genericRow.setField(0, 111L);
+        genericRow.setField(1, 222L);
+
+        project[0] = -1;
+        project[1] = -1;
+        project[2] = -1;
+        project[3] = -1;
+        project[4] = -1;
+        project[5] = 1;
+        project[6] = 0;
+
+        GenericRow mainRow = new GenericRow(7);
+        mainRow.setField(0, 0L);
+        mainRow.setField(1, 1L);
+        mainRow.setField(2, 2L);
+        mainRow.setField(3, 3L);
+        FallbackMappingRow fallbackMappingRow = new 
FallbackMappingRow(project);
+        fallbackMappingRow.replace(mainRow, genericRow);
+
+        assertThat(fallbackMappingRow.getFieldCount()).isEqualTo(7);
+        assertThat(fallbackMappingRow.getLong(6)).isEqualTo(111L);
+        assertThat(fallbackMappingRow.getLong(5)).isEqualTo(222L);
+        assertThat(fallbackMappingRow.getLong(0)).isEqualTo(0L);
+        assertThat(fallbackMappingRow.getLong(1)).isEqualTo(1L);
+        assertThat(fallbackMappingRow.getLong(2)).isEqualTo(2L);
+        assertThat(fallbackMappingRow.getLong(3)).isEqualTo(3L);
+        assertThat(fallbackMappingRow.isNullAt(4)).isEqualTo(true);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 94dbcbda9b..d998b290dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -304,7 +304,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.commitTimeout(),
                 options.commitMinRetryWait(),
                 options.commitMaxRetryWait(),
-                options.commitStrictModeLastSafeSnapshot().orElse(null));
+                options.commitStrictModeLastSafeSnapshot().orElse(null),
+                options.rowTrackingEnabled());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index a277ce6530..c81a44d426 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -80,7 +80,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 rowType,
                 FileFormatDiscover.of(options),
                 pathFactory(),
-                options.fileIndexReadEnabled());
+                options.fileIndexReadEnabled(),
+                options.rowTrackingEnabled());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java 
b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
index c0865cdd34..a715ab8234 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
@@ -63,7 +63,8 @@ public class Changelog extends Snapshot {
                 snapshot.changelogRecordCount(),
                 snapshot.watermark(),
                 snapshot.statistics(),
-                snapshot.properties);
+                snapshot.properties,
+                snapshot.nextRowId);
     }
 
     @JsonCreator
@@ -89,7 +90,8 @@ public class Changelog extends Snapshot {
             @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long 
changelogRecordCount,
             @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
             @JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
-            @JsonProperty(FIELD_PROPERTIES) Map<String, String> properties) {
+            @JsonProperty(FIELD_PROPERTIES) Map<String, String> properties,
+            @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) {
         super(
                 version,
                 id,
@@ -111,7 +113,8 @@ public class Changelog extends Snapshot {
                 changelogRecordCount,
                 watermark,
                 statistics,
-                properties);
+                properties,
+                nextRowId);
     }
 
     public static Changelog fromJson(String json) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index a230e10d69..f7a743ace1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -131,7 +131,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 valueType,
                 FileFormatDiscover.of(options),
                 pathFactory(),
-                options.fileIndexReadEnabled());
+                options.fileIndexReadEnabled(),
+                false);
     }
 
     public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index eb28759f93..8fa2aea2be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -83,7 +83,8 @@ public class DataFileMeta {
                                     16,
                                     "_VALUE_STATS_COLS",
                                     
DataTypes.ARRAY(DataTypes.STRING().notNull())),
-                            new DataField(17, "_EXTERNAL_PATH", 
newStringType(true))));
+                            new DataField(17, "_EXTERNAL_PATH", 
newStringType(true)),
+                            new DataField(18, "_FIRST_ROW_ID", new 
BigIntType(true))));
 
     public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
     public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
@@ -100,6 +101,7 @@ public class DataFileMeta {
     private final SimpleStats keyStats;
     private final SimpleStats valueStats;
 
+    // As for row-lineage table, this will be reassigned while committing
     private final long minSequenceNumber;
     private final long maxSequenceNumber;
     private final long schemaId;
@@ -124,6 +126,8 @@ public class DataFileMeta {
     /** external path of file, if it is null, it is in the default warehouse 
path. */
     private final @Nullable String externalPath;
 
+    private final @Nullable Long firstRowId;
+
     public static DataFileMeta forAppend(
             String fileName,
             long fileSize,
@@ -136,7 +140,8 @@ public class DataFileMeta {
             @Nullable byte[] embeddedIndex,
             @Nullable FileSource fileSource,
             @Nullable List<String> valueStatsCols,
-            @Nullable String externalPath) {
+            @Nullable String externalPath,
+            @Nullable Long firstRowId) {
         return new DataFileMeta(
                 fileName,
                 fileSize,
@@ -155,7 +160,8 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                externalPath);
+                externalPath,
+                firstRowId);
     }
 
     public DataFileMeta(
@@ -175,7 +181,8 @@ public class DataFileMeta {
             @Nullable byte[] embeddedIndex,
             @Nullable FileSource fileSource,
             @Nullable List<String> valueStatsCols,
-            @Nullable String externalPath) {
+            @Nullable String externalPath,
+            @Nullable Long firstRowId) {
         this(
                 fileName,
                 fileSize,
@@ -194,7 +201,8 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                externalPath);
+                externalPath,
+                firstRowId);
     }
 
     public DataFileMeta(
@@ -212,7 +220,8 @@ public class DataFileMeta {
             @Nullable Long deleteRowCount,
             @Nullable byte[] embeddedIndex,
             @Nullable FileSource fileSource,
-            @Nullable List<String> valueStatsCols) {
+            @Nullable List<String> valueStatsCols,
+            @Nullable Long firstRowId) {
         this(
                 fileName,
                 fileSize,
@@ -231,7 +240,8 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                null);
+                null,
+                firstRowId);
     }
 
     public DataFileMeta(
@@ -252,7 +262,8 @@ public class DataFileMeta {
             @Nullable byte[] embeddedIndex,
             @Nullable FileSource fileSource,
             @Nullable List<String> valueStatsCols,
-            @Nullable String externalPath) {
+            @Nullable String externalPath,
+            @Nullable Long firstRowId) {
         this.fileName = fileName;
         this.fileSize = fileSize;
 
@@ -275,6 +286,7 @@ public class DataFileMeta {
         this.fileSource = fileSource;
         this.valueStatsCols = valueStatsCols;
         this.externalPath = externalPath;
+        this.firstRowId = firstRowId;
     }
 
     public String fileName() {
@@ -387,6 +399,11 @@ public class DataFileMeta {
         return valueStatsCols;
     }
 
+    @Nullable
+    public Long firstRowId() {
+        return firstRowId;
+    }
+
     public DataFileMeta upgrade(int newLevel) {
         checkArgument(newLevel > this.level);
         return new DataFileMeta(
@@ -407,7 +424,8 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                externalPath);
+                externalPath,
+                firstRowId);
     }
 
     public DataFileMeta rename(String newFileName) {
@@ -430,7 +448,8 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                newExternalPath);
+                newExternalPath,
+                firstRowId);
     }
 
     public DataFileMeta copyWithoutStats() {
@@ -452,7 +471,54 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 Collections.emptyList(),
-                externalPath);
+                externalPath,
+                firstRowId);
+    }
+
+    public DataFileMeta assignSequenceNumber(long minSequenceNumber, long 
maxSequenceNumber) {
+        return new DataFileMeta(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                keyStats,
+                valueStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                level,
+                extraFiles,
+                creationTime,
+                deleteRowCount,
+                embeddedIndex,
+                fileSource,
+                valueStatsCols,
+                externalPath,
+                firstRowId);
+    }
+
+    public DataFileMeta assignFirstRowId(long firstRowId) {
+        return new DataFileMeta(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                keyStats,
+                valueStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                level,
+                extraFiles,
+                creationTime,
+                deleteRowCount,
+                embeddedIndex,
+                fileSource,
+                valueStatsCols,
+                externalPath,
+                firstRowId);
     }
 
     public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -481,7 +547,8 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                externalPath);
+                externalPath,
+                firstRowId);
     }
 
     public DataFileMeta newExternalPath(String newExternalPath) {
@@ -503,7 +570,8 @@ public class DataFileMeta {
                 embeddedIndex,
                 fileSource,
                 valueStatsCols,
-                newExternalPath);
+                newExternalPath,
+                firstRowId);
     }
 
     public DataFileMeta copy(byte[] newEmbeddedIndex) {
@@ -525,7 +593,8 @@ public class DataFileMeta {
                 newEmbeddedIndex,
                 fileSource,
                 valueStatsCols,
-                externalPath);
+                externalPath,
+                firstRowId);
     }
 
     @Override
@@ -554,7 +623,8 @@ public class DataFileMeta {
                 && Objects.equals(deleteRowCount, that.deleteRowCount)
                 && Objects.equals(fileSource, that.fileSource)
                 && Objects.equals(valueStatsCols, that.valueStatsCols)
-                && Objects.equals(externalPath, that.externalPath);
+                && Objects.equals(externalPath, that.externalPath)
+                && Objects.equals(firstRowId, that.firstRowId);
     }
 
     @Override
@@ -577,7 +647,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 fileSource,
                 valueStatsCols,
-                externalPath);
+                externalPath,
+                firstRowId);
     }
 
     @Override
@@ -587,7 +658,7 @@ public class DataFileMeta {
                         + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: 
%s, "
                         + "minSequenceNumber: %d, maxSequenceNumber: %d, "
                         + "schemaId: %d, level: %d, extraFiles: %s, 
creationTime: %s, "
-                        + "deleteRowCount: %d, fileSource: %s, valueStatsCols: 
%s, externalPath: %s}",
+                        + "deleteRowCount: %d, fileSource: %s, valueStatsCols: 
%s, externalPath: %s, firstRowId: %s}",
                 fileName,
                 fileSize,
                 rowCount,
@@ -605,7 +676,8 @@ public class DataFileMeta {
                 deleteRowCount,
                 fileSource,
                 valueStatsCols,
-                externalPath);
+                externalPath,
+                firstRowId);
     }
 
     public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
index e6c10f1534..029d2cca55 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -134,6 +134,7 @@ public class DataFileMeta08Serializer implements 
Serializable {
                 row.isNullAt(14) ? null : row.getBinary(14),
                 null,
                 null,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
index 36d1ad260f..5c238b86bf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
@@ -140,6 +140,7 @@ public class DataFileMeta09Serializer implements 
Serializable {
                 row.isNullAt(14) ? null : row.getBinary(14),
                 row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)),
                 null,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
index 518db7c658..03e4f1a189 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
@@ -145,6 +145,7 @@ public class DataFileMeta10LegacySerializer implements 
Serializable {
                 row.isNullAt(14) ? null : row.getBinary(14),
                 row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)),
                 row.isNullAt(16) ? null : 
fromStringArrayData(row.getArray(16)),
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
similarity index 93%
copy from 
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
copy to 
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
index 518db7c658..b372467d26 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
@@ -46,8 +46,8 @@ import static 
org.apache.paimon.utils.SerializationUtils.newBytesType;
 import static org.apache.paimon.utils.SerializationUtils.newStringType;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 
-/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */
-public class DataFileMeta10LegacySerializer implements Serializable {
+/** Serializer for {@link DataFileMeta} with 1.2 snapshot version. */
+public class DataFileMeta12LegacySerializer implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
@@ -75,11 +75,12 @@ public class DataFileMeta10LegacySerializer implements 
Serializable {
                             new DataField(
                                     16,
                                     "_VALUE_STATS_COLS",
-                                    
DataTypes.ARRAY(DataTypes.STRING().notNull()))));
+                                    
DataTypes.ARRAY(DataTypes.STRING().notNull())),
+                            new DataField(17, "_EXTERNAL_PATH", 
newStringType(true))));
 
     protected final InternalRowSerializer rowSerializer;
 
-    public DataFileMeta10LegacySerializer() {
+    public DataFileMeta12LegacySerializer() {
         this.rowSerializer = InternalSerializers.create(SCHEMA);
     }
 
@@ -110,7 +111,8 @@ public class DataFileMeta10LegacySerializer implements 
Serializable {
                         meta.deleteRowCount().orElse(null),
                         meta.embeddedIndex(),
                         
meta.fileSource().map(FileSource::toByteValue).orElse(null),
-                        toStringArrayData(meta.valueStatsCols()));
+                        toStringArrayData(meta.valueStatsCols()),
+                        
meta.fileSource().map(FileSource::toByteValue).orElse(null));
         rowSerializer.serialize(row, target);
     }
 
@@ -145,6 +147,7 @@ public class DataFileMeta10LegacySerializer implements 
Serializable {
                 row.isNullAt(14) ? null : row.getBinary(14),
                 row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)),
                 row.isNullAt(16) ? null : 
fromStringArrayData(row.getArray(16)),
+                row.isNullAt(17) ? null : row.getString(17).toString(),
                 null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index a316f897ff..7eac729586 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -59,7 +59,8 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 meta.embeddedIndex(),
                 meta.fileSource().map(FileSource::toByteValue).orElse(null),
                 toStringArrayData(meta.valueStatsCols()),
-                
meta.externalPath().map(BinaryString::fromString).orElse(null));
+                meta.externalPath().map(BinaryString::fromString).orElse(null),
+                meta.firstRowId());
     }
 
     @Override
@@ -82,6 +83,7 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 row.isNullAt(14) ? null : row.getBinary(14),
                 row.isNullAt(15) ? null : 
FileSource.fromByteValue(row.getByte(15)),
                 row.isNullAt(16) ? null : 
fromStringArrayData(row.getArray(16)),
-                row.isNullAt(17) ? null : row.getString(17).toString());
+                row.isNullAt(17) ? null : row.getString(17).toString(),
+                row.isNullAt(18) ? null : row.getLong(18));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 16fad55a49..f7b283daf0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -21,34 +21,51 @@ package org.apache.paimon.io;
 import org.apache.paimon.PartitionSettedRow;
 import org.apache.paimon.casting.CastFieldGetter;
 import org.apache.paimon.casting.CastedRow;
+import org.apache.paimon.casting.FallbackMappingRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ProjectedRow;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
 
 /** Reads {@link InternalRow} from data files. */
 public class DataFileRecordReader implements FileRecordReader<InternalRow> {
 
+    private final RowType tableRowType;
     private final FileRecordReader<InternalRow> reader;
     @Nullable private final int[] indexMapping;
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
+    private final boolean rowLineageEnabled;
+    @Nullable private final Long firstRowId;
+    private final long maxSequenceNumber;
+    private final Map<String, Integer> systemFields;
 
     public DataFileRecordReader(
+            RowType tableRowType,
             FormatReaderFactory readerFactory,
             FormatReaderFactory.Context context,
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping,
-            @Nullable PartitionInfo partitionInfo)
+            @Nullable PartitionInfo partitionInfo,
+            boolean rowLineageEnabled,
+            @Nullable Long firstRowId,
+            long maxSequenceNumber,
+            Map<String, Integer> systemFields)
             throws IOException {
+        this.tableRowType = tableRowType;
         try {
             this.reader = readerFactory.createReader(context);
         } catch (Exception e) {
@@ -58,6 +75,10 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
         this.indexMapping = indexMapping;
         this.partitionInfo = partitionInfo;
         this.castMapping = castMapping;
+        this.rowLineageEnabled = rowLineageEnabled;
+        this.firstRowId = firstRowId;
+        this.maxSequenceNumber = maxSequenceNumber;
+        this.systemFields = systemFields;
     }
 
     @Nullable
@@ -70,6 +91,11 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
 
         if (iterator instanceof ColumnarRowIterator) {
             iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, 
indexMapping);
+            if (rowLineageEnabled) {
+                iterator =
+                        ((ColumnarRowIterator) iterator)
+                                .assignRowLineage(firstRowId, 
maxSequenceNumber, systemFields);
+            }
         } else {
             if (partitionInfo != null) {
                 final PartitionSettedRow partitionSettedRow =
@@ -81,6 +107,36 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
                 final ProjectedRow projectedRow = 
ProjectedRow.from(indexMapping);
                 iterator = iterator.transform(projectedRow::replaceRow);
             }
+
+            if (rowLineageEnabled && !systemFields.isEmpty()) {
+                GenericRow lineageRow = new GenericRow(2);
+
+                int[] fallbackToLineageMappings = new 
int[tableRowType.getFieldCount()];
+                Arrays.fill(fallbackToLineageMappings, -1);
+
+                if (systemFields.containsKey(SpecialFields.ROW_ID.name())) {
+                    
fallbackToLineageMappings[systemFields.get(SpecialFields.ROW_ID.name())] = 0;
+                }
+                if 
(systemFields.containsKey(SpecialFields.SEQUENCE_NUMBER.name())) {
+                    fallbackToLineageMappings[
+                                    
systemFields.get(SpecialFields.SEQUENCE_NUMBER.name())] =
+                            1;
+                }
+
+                FallbackMappingRow fallbackMappingRow =
+                        new FallbackMappingRow(fallbackToLineageMappings);
+                final FileRecordIterator<InternalRow> iteratorInner = iterator;
+                iterator =
+                        iterator.transform(
+                                row -> {
+                                    if (firstRowId != null) {
+                                        lineageRow.setField(
+                                                0, 
iteratorInner.returnedPosition() + firstRowId);
+                                    }
+                                    lineageRow.setField(1, maxSequenceNumber);
+                                    return fallbackMappingRow.replace(row, 
lineageRow);
+                                });
+            }
         }
 
         if (castMapping != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 76f4ebd3a2..077b90df36 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -178,7 +178,8 @@ public abstract class KeyValueDataFileWriter
                 indexResult.embeddedIndexBytes(),
                 fileSource,
                 valueStatsPair.getKey(),
-                externalPath);
+                externalPath,
+                null);
     }
 
     abstract Pair<SimpleColStats[], SimpleColStats[]> 
fetchKeyValueStats(SimpleColStats[] rowStats);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 14221d50be..33dadec1a5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -127,6 +127,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
         long fileSize = file.fileSize();
         FileRecordReader<InternalRow> fileRecordReader =
                 new DataFileRecordReader(
+                        schema.logicalRowType(),
                         formatReaderMapping.getReaderFactory(),
                         orcPoolSize == null
                                 ? new FormatReaderContext(fileIO, filePath, 
fileSize)
@@ -134,7 +135,11 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                                         fileIO, filePath, fileSize, 
orcPoolSize),
                         formatReaderMapping.getIndexMapping(),
                         formatReaderMapping.getCastMapping(),
-                        
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));
+                        
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),
+                        false,
+                        null,
+                        -1,
+                        Collections.emptyMap());
 
         Optional<DeletionVector> deletionVector = 
dvFactory.create(file.fileName());
         if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
@@ -267,7 +272,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                     finalReadKeyType,
                     readValueType,
                     new FormatReaderMapping.Builder(
-                            formatDiscover, readTableFields, fieldsExtractor, 
filters),
+                            formatDiscover, readTableFields, fieldsExtractor, 
filters, false),
                     pathFactory.createDataFilePathFactory(partition, bucket),
                     options.fileReaderAsyncThreshold().getBytes(),
                     partition,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 432845de32..b3f9d9ecff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -116,6 +116,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
                 indexResult.embeddedIndexBytes(),
                 fileSource,
                 statsPair.getKey(),
-                externalPath);
+                externalPath,
+                null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 75ab52ac75..0de753f166 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -140,6 +140,20 @@ public class ManifestEntry implements FileEntry {
         return new ManifestEntry(kind, partition, bucket, totalBuckets, 
file.copyWithoutStats());
     }
 
+    public ManifestEntry assignSequenceNumber(long minSequenceNumber, long 
maxSequenceNumber) {
+        return new ManifestEntry(
+                kind,
+                partition,
+                bucket,
+                totalBuckets,
+                file.assignSequenceNumber(minSequenceNumber, 
maxSequenceNumber));
+    }
+
+    public ManifestEntry assignFirstRowId(long firstRowId) {
+        return new ManifestEntry(
+                kind, partition, bucket, totalBuckets, 
file.assignFirstRowId(firstRowId));
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof ManifestEntry)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 42b349d7d1..8e10798acb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -218,6 +218,7 @@ public class FileMetaUtils {
                 null,
                 FileSource.APPEND,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 743cf36eeb..3a3d00263b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -29,6 +29,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestCommittable;
@@ -92,6 +93,7 @@ import static 
org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
 import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
  * Default implementation of {@link FileStoreCommit}.
@@ -147,6 +149,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final int commitMaxRetries;
     @Nullable private Long strictModeLastSafeSnapshot;
     private final InternalRowPartitionComputer partitionComputer;
+    private final boolean rowTrackingEnabled;
 
     private boolean ignoreEmptyCommit;
     private CommitMetrics commitMetrics;
@@ -182,7 +185,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             long commitTimeout,
             long commitMinRetryWait,
             long commitMaxRetryWait,
-            @Nullable Long strictModeLastSafeSnapshot) {
+            @Nullable Long strictModeLastSafeSnapshot,
+            boolean rowTrackingEnabled) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
@@ -225,6 +229,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.commitMetrics = null;
         this.statsFileHandler = statsFileHandler;
         this.bucketMode = bucketMode;
+        this.rowTrackingEnabled = rowTrackingEnabled;
     }
 
     @Override
@@ -247,7 +252,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         }
 
         for (int i = 1; i < committables.size(); i++) {
-            Preconditions.checkArgument(
+            checkArgument(
                     committables.get(i).identifier() > committables.get(i - 
1).identifier(),
                     "Committables must be sorted according to identifiers 
before filtering. This is unexpected.");
         }
@@ -549,7 +554,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     @Override
     public void dropPartitions(List<Map<String, String>> partitions, long 
commitIdentifier) {
-        Preconditions.checkArgument(!partitions.isEmpty(), "Partitions list 
cannot be empty.");
+        checkArgument(!partitions.isEmpty(), "Partitions list cannot be 
empty.");
 
         if (LOG.isDebugEnabled()) {
             LOG.debug(
@@ -917,6 +922,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
         long newSnapshotId =
                 latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshot.id() + 1;
+        long firstRowIdStart =
+                latestSnapshot == null
+                        ? 0L
+                        : latestSnapshot.nextRowId() == null ? 0L : 
latestSnapshot.nextRowId();
 
         if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot 
>= 0) {
             for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; 
id++) {
@@ -989,6 +998,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         String indexManifest = null;
         List<ManifestFileMeta> mergeBeforeManifests = new ArrayList<>();
         List<ManifestFileMeta> mergeAfterManifests = new ArrayList<>();
+        long nextRowIdStart = firstRowIdStart;
         try {
             long previousTotalRecordCount = 0L;
             Long currentWatermark = watermark;
@@ -1024,6 +1034,17 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                             manifestReadParallelism);
             baseManifestList = manifestList.write(mergeAfterManifests);
 
+            if (rowTrackingEnabled) {
+                // assigned snapshot id to delta files
+                List<ManifestEntry> snapshotAssigned = new ArrayList<>();
+                assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
+                // assign row id for new files
+                List<ManifestEntry> rowIdAssigned = new ArrayList<>();
+                nextRowIdStart =
+                        assignRowLineageMeta(firstRowIdStart, 
snapshotAssigned, rowIdAssigned);
+                deltaFiles = rowIdAssigned;
+            }
+
             // the added records subtract the deleted records from
             long deltaRecordCount = recordCountAdd(deltaFiles) - 
recordCountDelete(deltaFiles);
             long totalRecordCount = previousTotalRecordCount + 
deltaRecordCount;
@@ -1083,7 +1104,8 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                             currentWatermark,
                             statsFileName,
                             // if empty properties, just set to null
-                            properties.isEmpty() ? null : properties);
+                            properties.isEmpty() ? null : properties,
+                            nextRowIdStart);
         } catch (Throwable e) {
             // fails when preparing for commit, we should clean up
             cleanUpReuseTmpManifests(
@@ -1133,10 +1155,41 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         if (strictModeLastSafeSnapshot != null) {
             strictModeLastSafeSnapshot = newSnapshot.id();
         }
-        commitCallbacks.forEach(callback -> callback.call(deltaFiles, 
indexFiles, newSnapshot));
+        final List<ManifestEntry> finalDeltaFiels = deltaFiles;
+        commitCallbacks.forEach(
+                callback -> callback.call(finalDeltaFiels, indexFiles, 
newSnapshot));
         return new SuccessResult();
     }
 
+    private long assignRowLineageMeta(
+            long firstRowIdStart,
+            List<ManifestEntry> deltaFiles,
+            List<ManifestEntry> rowIdAssigned) {
+        if (deltaFiles.isEmpty()) {
+            return firstRowIdStart;
+        }
+        // assign row id for new files
+        long start = firstRowIdStart;
+        for (ManifestEntry entry : deltaFiles) {
+            checkArgument(
+                    entry.file().fileSource().isPresent(),
+                    "This is a bug, file source field for row-tracking table 
must present.");
+            if (entry.file().fileSource().get().equals(FileSource.APPEND)) {
+                long rowCount = entry.file().rowCount();
+                rowIdAssigned.add(entry.assignFirstRowId(start));
+                start += rowCount;
+            }
+        }
+        return start;
+    }
+
+    private void assignSnapshotId(
+            long snapshotId, List<ManifestEntry> deltaFiles, 
List<ManifestEntry> snapshotAssigned) {
+        for (ManifestEntry entry : deltaFiles) {
+            snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, 
snapshotId));
+        }
+    }
+
     public void compactManifest() {
         int retryCount = 0;
         long startMillis = System.currentTimeMillis();
@@ -1211,7 +1264,8 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         0L,
                         latestSnapshot.watermark(),
                         latestSnapshot.statistics(),
-                        latestSnapshot.properties());
+                        latestSnapshot.properties(),
+                        latestSnapshot.nextRowId());
 
         return commitSnapshotImpl(newSnapshot, emptyList());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index a6690ef660..d25910dda3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -79,6 +79,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     private final FileStorePathFactory pathFactory;
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final boolean fileIndexReadEnabled;
+    private final boolean rowTrackingEnabled;
 
     private RowType readRowType;
     @Nullable private List<Predicate> filters;
@@ -90,7 +91,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
             RowType rowType,
             FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory,
-            boolean fileIndexReadEnabled) {
+            boolean fileIndexReadEnabled,
+            boolean rowTrackingEnabled) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schema = schema;
@@ -98,6 +100,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         this.pathFactory = pathFactory;
         this.formatReaderMappings = new HashMap<>();
         this.fileIndexReadEnabled = fileIndexReadEnabled;
+        this.rowTrackingEnabled = rowTrackingEnabled;
         this.readRowType = rowType;
     }
 
@@ -153,7 +156,12 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
 
         List<DataField> readTableFields = readRowType.getFields();
         Builder formatReaderMappingBuilder =
-                new Builder(formatDiscover, readTableFields, 
TableSchema::fields, filters);
+                new Builder(
+                        formatDiscover,
+                        readTableFields,
+                        TableSchema::fields,
+                        filters,
+                        rowTrackingEnabled);
 
         for (int i = 0; i < files.size(); i++) {
             DataFileMeta file = files.get(i);
@@ -235,11 +243,16 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                         fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), selection);
         FileRecordReader<InternalRow> fileRecordReader =
                 new DataFileRecordReader(
+                        schema.logicalRowType(),
                         formatReaderMapping.getReaderFactory(),
                         formatReaderContext,
                         formatReaderMapping.getIndexMapping(),
                         formatReaderMapping.getCastMapping(),
-                        
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));
+                        
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),
+                        rowTrackingEnabled,
+                        file.firstRowId(),
+                        file.maxSequenceNumber(),
+                        formatReaderMapping.getSystemFields());
 
         if (fileIndexResult instanceof BitmapIndexResult) {
             fileRecordReader =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 6c2d12e222..f0603d4d53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -63,6 +63,7 @@ import static 
org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
+import static org.apache.paimon.CoreOptions.PRIMARY_KEY;
 import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
 import static org.apache.paimon.CoreOptions.SCAN_MODE;
 import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
@@ -236,6 +237,8 @@ public class SchemaValidation {
         }
 
         validateMergeFunctionFactory(schema);
+
+        validateRowLineage(schema, options);
     }
 
     public static void validateFallbackBranch(SchemaManager schemaManager, 
TableSchema schema) {
@@ -630,4 +633,17 @@ public class SchemaValidation {
             }
         }
     }
+
+    private static void validateRowLineage(TableSchema schema, CoreOptions 
options) {
+        if (options.rowTrackingEnabled()) {
+            checkArgument(
+                    options.bucket() == -1,
+                    "Cannot define %s for row lineage table, it only support 
bucket = -1",
+                    CoreOptions.BUCKET.key());
+            checkArgument(
+                    schema.primaryKeys().isEmpty(),
+                    "Cannot define %s for row lineage table.",
+                    PRIMARY_KEY.key());
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index fe1bc6adb9..64bd218167 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -157,6 +157,7 @@ public class CommitMessageLegacyV2Serializer {
                     null,
                     null,
                     null,
+                    null,
                     null);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 4d91359ec0..47c69855cb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -27,6 +27,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMeta09Serializer;
 import org.apache.paimon.io.DataFileMeta10LegacySerializer;
+import org.apache.paimon.io.DataFileMeta12LegacySerializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.DataInputDeserializer;
@@ -48,11 +49,12 @@ import static 
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 /** {@link VersionedSerializer} for {@link CommitMessage}. */
 public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessage> {
 
-    private static final int CURRENT_VERSION = 7;
+    private static final int CURRENT_VERSION = 8;
 
     private final DataFileMetaSerializer dataFileSerializer;
     private final IndexFileMetaSerializer indexEntrySerializer;
 
+    private DataFileMeta12LegacySerializer dataFileMeta12LegacySerializer;
     private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer;
     private DataFileMeta09Serializer dataFile09Serializer;
     private DataFileMeta08Serializer dataFile08Serializer;
@@ -142,8 +144,13 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
 
     private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
             int version, DataInputView view) {
-        if (version >= 6) {
+        if (version >= 8) {
             return () -> dataFileSerializer.deserializeList(view);
+        } else if (version == 6 || version == 7) {
+            if (dataFileMeta12LegacySerializer == null) {
+                dataFileMeta12LegacySerializer = new 
DataFileMeta12LegacySerializer();
+            }
+            return () -> dataFileMeta12LegacySerializer.deserializeList(view);
         } else if (version == 4 || version == 5) {
             if (dataFileMeta10LegacySerializer == null) {
                 dataFileMeta10LegacySerializer = new 
DataFileMeta10LegacySerializer();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index f4491618d8..1feb72afa3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -24,6 +24,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMeta08Serializer;
 import org.apache.paimon.io.DataFileMeta09Serializer;
 import org.apache.paimon.io.DataFileMeta10LegacySerializer;
+import org.apache.paimon.io.DataFileMeta12LegacySerializer;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataInputViewStreamWrapper;
@@ -58,7 +59,7 @@ public class DataSplit implements Split {
 
     private static final long serialVersionUID = 7L;
     private static final long MAGIC = -2394839472490812314L;
-    private static final int VERSION = 6;
+    private static final int VERSION = 7;
 
     private long snapshotId = 0;
     private BinaryRow partition;
@@ -426,7 +427,10 @@ public class DataSplit implements Split {
         } else if (version == 3 || version == 4) {
             DataFileMeta10LegacySerializer serializer = new 
DataFileMeta10LegacySerializer();
             return serializer::deserialize;
-        } else if (version >= 5) {
+        } else if (version == 5 || version == 6) {
+            DataFileMeta12LegacySerializer serializer = new 
DataFileMeta12LegacySerializer();
+            return serializer::deserialize;
+        } else if (version >= 7) {
             DataFileMetaSerializer serializer = new DataFileMetaSerializer();
             return serializer::deserialize;
         } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 83bfbe831e..eb18e679f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -464,7 +464,9 @@ public class FilesTable implements ReadonlyTable {
             lazyNullValueCounts = new TreeMap<>();
             lazyLowerValueBounds = new TreeMap<>();
             lazyUpperValueBounds = new TreeMap<>();
-            for (int i = 0; i < min.getFieldCount(); i++) {
+            int length =
+                    Math.min(min.getFieldCount(), 
simpleStatsEvolutions.tableDataFields().size());
+            for (int i = 0; i < length; i++) {
                 DataField field = 
simpleStatsEvolutions.tableDataFields().get(i);
                 String name = field.name();
                 DataType type = field.type();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
new file mode 100644
index 0000000000..c428df607f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.SimpleFileReader;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for reading row id of table. */
+public class RowLineageTable implements DataTable, ReadonlyTable {
+
+    public static final String ROW_LINEAGE = "row_lineage";
+
+    private final FileStoreTable wrapped;
+
+    public RowLineageTable(FileStoreTable wrapped) {
+        this.wrapped = wrapped;
+
+        if (!coreOptions().rowTrackingEnabled()) {
+            throw new IllegalArgumentException(
+                    "Table "
+                            + wrapped.name()
+                            + " does not support row tracking, "
+                            + "please enable 'row-tracking.enabled' option to 
use 'with_metadata' table.");
+        }
+    }
+
+    @Override
+    public Optional<Snapshot> latestSnapshot() {
+        return wrapped.latestSnapshot();
+    }
+
+    @Override
+    public Snapshot snapshot(long snapshotId) {
+        return wrapped.snapshot(snapshotId);
+    }
+
+    @Override
+    public SimpleFileReader<ManifestFileMeta> manifestListReader() {
+        return wrapped.manifestListReader();
+    }
+
+    @Override
+    public SimpleFileReader<ManifestEntry> manifestFileReader() {
+        return wrapped.manifestFileReader();
+    }
+
+    @Override
+    public SimpleFileReader<IndexManifestEntry> indexManifestFileReader() {
+        return wrapped.indexManifestFileReader();
+    }
+
+    @Override
+    public String name() {
+        return wrapped.name() + SYSTEM_TABLE_SPLITTER + ROW_LINEAGE;
+    }
+
+    @Override
+    public RowType rowType() {
+        List<DataField> fields = new 
ArrayList<>(wrapped.rowType().getFields());
+        fields.add(SpecialFields.ROW_ID);
+        fields.add(SpecialFields.SEQUENCE_NUMBER);
+        return new RowType(fields);
+    }
+
+    @Override
+    public List<String> partitionKeys() {
+        return wrapped.partitionKeys();
+    }
+
+    @Override
+    public Map<String, String> options() {
+        return wrapped.options();
+    }
+
+    @Override
+    public List<String> primaryKeys() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public SnapshotReader newSnapshotReader() {
+        return wrapped.newSnapshotReader();
+    }
+
+    @Override
+    public DataTableScan newScan() {
+        return wrapped.newScan();
+    }
+
+    @Override
+    public StreamDataTableScan newStreamScan() {
+        return wrapped.newStreamScan();
+    }
+
+    @Override
+    public CoreOptions coreOptions() {
+        return wrapped.coreOptions();
+    }
+
+    @Override
+    public Path location() {
+        return wrapped.location();
+    }
+
+    @Override
+    public SnapshotManager snapshotManager() {
+        return wrapped.snapshotManager();
+    }
+
+    @Override
+    public ChangelogManager changelogManager() {
+        return wrapped.changelogManager();
+    }
+
+    @Override
+    public ConsumerManager consumerManager() {
+        return wrapped.consumerManager();
+    }
+
+    @Override
+    public SchemaManager schemaManager() {
+        return wrapped.schemaManager();
+    }
+
+    @Override
+    public TagManager tagManager() {
+        return wrapped.tagManager();
+    }
+
+    @Override
+    public BranchManager branchManager() {
+        return wrapped.branchManager();
+    }
+
+    @Override
+    public DataTable switchToBranch(String branchName) {
+        return new RowLineageTable(wrapped.switchToBranch(branchName));
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return wrapped.newRead().withReadType(rowType());
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new RowLineageTable(wrapped.copy(dynamicOptions));
+    }
+
+    @Override
+    public FileIO fileIO() {
+        return wrapped.fileIO();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 57c3c2caac..69f7a65ddf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -45,6 +45,7 @@ import static 
org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
 import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
 import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
 import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
+import static org.apache.paimon.table.system.RowLineageTable.ROW_LINEAGE;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
 import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
@@ -72,6 +73,7 @@ public class SystemTableLoader {
                     .put(STATISTICS, StatisticTable::new)
                     .put(BINLOG, BinlogTable::new)
                     .put(TABLE_INDEXES, TableIndexesTable::new)
+                    .put(ROW_LINEAGE, RowLineageTable::new)
                     .build();
 
     public static final List<String> SYSTEM_TABLES = new 
ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
index c7db185068..73a2e19cc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
@@ -80,6 +80,7 @@ public class Tag extends Snapshot {
             @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
             @JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
             @JsonProperty(FIELD_PROPERTIES) Map<String, String> properties,
+            @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId,
             @JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime 
tagCreateTime,
             @JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration 
tagTimeRetained) {
         super(
@@ -103,7 +104,8 @@ public class Tag extends Snapshot {
                 changelogRecordCount,
                 watermark,
                 statistics,
-                properties);
+                properties,
+                nextRowId);
         this.tagCreateTime = tagCreateTime;
         this.tagTimeRetained = tagTimeRetained;
     }
@@ -147,6 +149,7 @@ public class Tag extends Snapshot {
                 snapshot.watermark(),
                 snapshot.statistics(),
                 snapshot.properties(),
+                snapshot.nextRowId(),
                 tagCreateTime,
                 tagTimeRetained);
     }
@@ -173,7 +176,8 @@ public class Tag extends Snapshot {
                 changelogRecordCount,
                 watermark,
                 statistics,
-                properties);
+                properties,
+                nextRowId);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 341199d1bd..752ea3b377 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -59,6 +59,7 @@ public class FormatReaderMapping {
     private final FormatReaderFactory readerFactory;
     private final TableSchema dataSchema;
     private final List<Predicate> dataFilters;
+    private final Map<String, Integer> systemFields;
 
     public FormatReaderMapping(
             @Nullable int[] indexMapping,
@@ -67,13 +68,15 @@ public class FormatReaderMapping {
             @Nullable Pair<int[], RowType> partitionPair,
             FormatReaderFactory readerFactory,
             TableSchema dataSchema,
-            List<Predicate> dataFilters) {
+            List<Predicate> dataFilters,
+            Map<String, Integer> systemFields) {
         this.indexMapping = combine(indexMapping, trimmedKeyMapping);
         this.castMapping = castMapping;
         this.readerFactory = readerFactory;
         this.partitionPair = partitionPair;
         this.dataSchema = dataSchema;
         this.dataFilters = dataFilters;
+        this.systemFields = systemFields;
     }
 
     private int[] combine(@Nullable int[] indexMapping, @Nullable int[] 
trimmedKeyMapping) {
@@ -111,6 +114,10 @@ public class FormatReaderMapping {
         return partitionPair;
     }
 
+    public Map<String, Integer> getSystemFields() {
+        return systemFields;
+    }
+
     public FormatReaderFactory getReaderFactory() {
         return readerFactory;
     }
@@ -130,16 +137,19 @@ public class FormatReaderMapping {
         private final List<DataField> readTableFields;
         private final Function<TableSchema, List<DataField>> fieldsExtractor;
         @Nullable private final List<Predicate> filters;
+        private final boolean rowTrackingEnabled;
 
         public Builder(
                 FileFormatDiscover formatDiscover,
                 List<DataField> readTableFields,
                 Function<TableSchema, List<DataField>> fieldsExtractor,
-                @Nullable List<Predicate> filters) {
+                @Nullable List<Predicate> filters,
+                boolean rowTrackingEnabled) {
             this.formatDiscover = formatDiscover;
             this.readTableFields = readTableFields;
             this.fieldsExtractor = fieldsExtractor;
             this.filters = filters;
+            this.rowTrackingEnabled = rowTrackingEnabled;
         }
 
         /**
@@ -165,8 +175,13 @@ public class FormatReaderMapping {
 
             // extract the whole data fields in logic.
             List<DataField> allDataFields = fieldsExtractor.apply(dataSchema);
+            if (rowTrackingEnabled) {
+                allDataFields.add(SpecialFields.ROW_ID);
+                allDataFields.add(SpecialFields.SEQUENCE_NUMBER.copy(true));
+            }
+            Map<String, Integer> systemFields = 
findSystemFields(readTableFields);
+
             List<DataField> readDataFields = readDataFields(allDataFields);
-            // build index cast mapping
             IndexCastMapping indexCastMapping =
                     
SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields);
 
@@ -196,7 +211,19 @@ public class FormatReaderMapping {
                             .discover(formatIdentifier)
                             .createReaderFactory(readRowType, readFilters),
                     dataSchema,
-                    readFilters);
+                    readFilters,
+                    systemFields);
+        }
+
+        private Map<String, Integer> findSystemFields(List<DataField> 
readTableFields) {
+            Map<String, Integer> systemFields = new HashMap<>();
+            for (int i = 0; i < readTableFields.size(); i++) {
+                DataField field = readTableFields.get(i);
+                if (SpecialFields.isSystemField(field.name())) {
+                    systemFields.put(field.name(), i);
+                }
+            }
+            return systemFields;
         }
 
         static Pair<int[], RowType> trimKeyFields(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
index e166086515..1e99e00a09 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
@@ -260,6 +260,7 @@ public class AppendCompactCoordinatorTest {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 69d4e5c11f..d8b5abe565 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -684,6 +684,7 @@ public class AppendOnlyWriterTest {
                 null,
                 FileSource.APPEND,
                 null,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index c9a4dacb90..a8592612d2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -161,6 +161,7 @@ public class IndexBootstrapTest extends TableTestBase {
                 null,
                 FileSource.APPEND,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index 7de4214bea..2029a9f616 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -165,6 +165,7 @@ public class DataFileTestDataGenerator {
                         kvs.stream().filter(kv -> 
kv.valueKind().isRetract()).count(),
                         null,
                         FileSource.APPEND,
+                        null,
                         null),
                 kvs);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index a44ef9a530..e0bbbb0e9b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -58,6 +58,7 @@ public class DataFileTestUtils {
                 null,
                 FileSource.APPEND,
                 null,
+                null,
                 null);
     }
 
@@ -77,6 +78,7 @@ public class DataFileTestUtils {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 
@@ -102,6 +104,7 @@ public class DataFileTestUtils {
                 deleteRowCount,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 6ed112de13..3fbaca4242 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -45,6 +45,82 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Compatibility Test for {@link ManifestCommittableSerializer}. */
 public class ManifestCommittableSerializerCompatibilityTest {
 
+    @Test
+    public void testCompatibilityToV4CommitV8() throws IOException {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs://localhost:9000/path/to/file",
+                        1L);
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
+        dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+        dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
+        IndexFileMeta indexFile =
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvMetas);
+        List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        16,
+                        new DataIncrement(dataFiles, dataFiles, dataFiles),
+                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
+                        new IndexIncrement(indexFiles));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+        manifestCommittable.addProperty("k1", "v1");
+        manifestCommittable.addProperty("k2", "v2");
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+
+        ManifestCommittable deserialized = 
serializer.deserialize(serializer.getVersion(), bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+
+        byte[] oldBytes =
+                IOUtils.readFully(
+                        ManifestCommittableSerializerCompatibilityTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/manifest-committable-v8"),
+                        true);
+        deserialized = serializer.deserialize(4, oldBytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
+
     @Test
     public void testCompatibilityToV4CommitV7() throws IOException {
         SimpleStats keyStats =
@@ -76,7 +152,8 @@ public class ManifestCommittableSerializerCompatibilityTest {
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
-                        "hdfs://localhost:9000/path/to/file");
+                        "hdfs://localhost:9000/path/to/file",
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
@@ -150,7 +227,8 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
-                        "hdfs://localhost:9000/path/to/file");
+                        "hdfs://localhost:9000/path/to/file",
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
@@ -222,7 +300,8 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
-                        "hdfs://localhost:9000/path/to/file");
+                        "hdfs://localhost:9000/path/to/file",
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
@@ -294,6 +373,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -365,6 +445,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -437,6 +518,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         null,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -509,6 +591,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         new byte[] {1, 2, 4},
                         null,
                         null,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -581,6 +664,7 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         null,
                         null,
                         null,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index 2372c7bb5f..950d40abd1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -116,6 +116,7 @@ public class ManifestCommittableSerializerTest {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index d27ab82354..a8b5ccdba4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -96,6 +96,7 @@ public abstract class ManifestFileMetaTestBase {
                         embeddedIndex, // not used
                         FileSource.APPEND,
                         null,
+                        null,
                         null));
     }
 
@@ -280,6 +281,7 @@ public abstract class ManifestFileMetaTestBase {
                         0L,
                         null,
                         FileSource.APPEND,
+                        null,
                         null));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index d804c97902..5cca50d1e6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -84,6 +84,7 @@ public class LevelsTest {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index 94c11498c5..ad33ea1c13 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -185,6 +185,7 @@ public class IntervalPartitionTest {
                 null,
                 FileSource.APPEND,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index c9ca9070c3..dccd22b008 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -424,6 +424,21 @@ public class UniversalCompactionTest {
 
     static DataFileMeta file(long size) {
         return new DataFileMeta(
-                "", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null, 
FileSource.APPEND, null);
+                "",
+                size,
+                1,
+                null,
+                null,
+                null,
+                null,
+                0,
+                0,
+                0,
+                0,
+                0L,
+                null,
+                FileSource.APPEND,
+                null,
+                null);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 3164ac96d0..cbf2b63d93 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -284,6 +284,7 @@ public class ExpireSnapshotsTest {
                         null,
                         FileSource.APPEND,
                         null,
+                        null,
                         null);
         ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, 
dataFile);
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
@@ -345,7 +346,8 @@ public class ExpireSnapshotsTest {
                         null,
                         FileSource.APPEND,
                         null,
-                        myDataFile.toString());
+                        myDataFile.toString(),
+                        null);
         ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, 
dataFile);
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index a1f7d69e28..3b00e7f088 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -59,6 +59,7 @@ public class SplitGeneratorTest {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 7213f11ca0..2d28ebb2fb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -216,6 +216,7 @@ public class SplitTest {
                         new byte[] {1, 2, 4},
                         null,
                         null,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -279,6 +280,7 @@ public class SplitTest {
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         null,
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -342,6 +344,7 @@ public class SplitTest {
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -409,6 +412,7 @@ public class SplitTest {
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
+                        null,
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
@@ -476,7 +480,8 @@ public class SplitTest {
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
-                        "hdfs:///path/to/warehouse");
+                        "hdfs:///path/to/warehouse",
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
33L);
@@ -543,7 +548,8 @@ public class SplitTest {
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
-                        "hdfs:///path/to/warehouse");
+                        "hdfs:///path/to/warehouse",
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
33L);
@@ -579,6 +585,75 @@ public class SplitTest {
         assertThat(actual).isEqualTo(split);
     }
 
+    @Test
+    public void testSerializerCompatibleV7() throws Exception {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs:///path/to/warehouse",
+                        12L);
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
33L);
+        List<DeletionFile> deletionFiles = 
Collections.singletonList(deletionFile);
+
+        BinaryRow partition = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+        binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+        binaryRowWriter.complete();
+
+        DataSplit split =
+                DataSplit.builder()
+                        .withSnapshot(18)
+                        .withPartition(partition)
+                        .withBucket(20)
+                        .withTotalBuckets(32)
+                        .withDataFiles(dataFiles)
+                        .withDataDeletionFiles(deletionFiles)
+                        .withBucketPath("my path")
+                        .build();
+
+        assertThat(InstantiationUtil.clone(split)).isEqualTo(split);
+
+        byte[] v6Bytes =
+                IOUtils.readFully(
+                        SplitTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/datasplit-v7"),
+                        true);
+
+        DataSplit actual =
+                InstantiationUtil.deserializeObject(v6Bytes, 
DataSplit.class.getClassLoader());
+        assertThat(actual).isEqualTo(split);
+    }
+
     private DataFileMeta newDataFile(long rowCount) {
         return newDataFile(rowCount, null, null);
     }
@@ -597,6 +672,7 @@ public class SplitTest {
                 null,
                 null,
                 valueStatsCols,
+                null,
                 null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index 0d888225dc..306820fa7d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -409,6 +409,7 @@ public class TagAutoManagerTest extends 
PrimaryKeyTableTestBase {
                         null,
                         null,
                         null,
+                        null,
                         null);
         tagManager.createTag(
                 snapshot1,
@@ -438,6 +439,7 @@ public class TagAutoManagerTest extends 
PrimaryKeyTableTestBase {
                         null,
                         null,
                         null,
+                        null,
                         null);
         tagManager.createTag(
                 snapshot2,
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java 
b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
index 0096238f72..27b3a575b6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
@@ -51,6 +51,7 @@ public class TagTest {
                     null,
                     null,
                     null,
+                    null,
                     null);
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
index dd00d142c8..0444d5644d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
@@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /** Test for {@link FormatReaderMapping.Builder}. */
@@ -133,7 +134,8 @@ public class FormatReaderMappingTest {
                         null,
                         null,
                         null,
-                        null);
+                        null,
+                        Collections.emptyMap());
 
         Assertions.assertThat(formatReaderMapping.getIndexMapping())
                 .containsExactly(0, 1, 0, -1, 2);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index b33227cad6..8a7484faac 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -263,6 +263,7 @@ public class SnapshotManagerTest {
                 null,
                 null,
                 null,
+                null,
                 null);
     }
 
@@ -287,6 +288,7 @@ public class SnapshotManagerTest {
                 null,
                 watermark,
                 null,
+                null,
                 null);
     }
 
@@ -312,6 +314,7 @@ public class SnapshotManagerTest {
                         null,
                         null,
                         null,
+                        null,
                         null));
     }
 
@@ -343,6 +346,7 @@ public class SnapshotManagerTest {
                             null,
                             null,
                             null,
+                            null,
                             null);
             localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), 
snapshot.toJson());
         }
@@ -396,6 +400,7 @@ public class SnapshotManagerTest {
                             null,
                             null,
                             null,
+                            null,
                             null);
             localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), 
snapshot.toJson());
         }
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v7 
b/paimon-core/src/test/resources/compatibility/datasplit-v7
new file mode 100644
index 0000000000..3c77a0f71c
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/datasplit-v7 differ
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v8
new file mode 100644
index 0000000000..8a436147dd
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v8 differ
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
index 025a7f4b5a..4afe48a4a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
@@ -94,6 +94,19 @@ public class AppendTableITCase extends CatalogITCaseBase {
         assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), 
Row.of("BBB"));
     }
 
+    @Test
+    public void testReadWriteWithLineage() {
+        batchSql("INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2, 
'BBB')");
+        List<Row> rows = batchSql("SELECT * FROM 
append_table_lineage$row_lineage");
+        assertThat(rows.size()).isEqualTo(2);
+        assertThat(rows)
+                .containsExactlyInAnyOrder(Row.of(1, "AAA", 0L, 1L), Row.of(2, 
"BBB", 1L, 1L));
+
+        rows = batchSql("SELECT * FROM append_table_lineage");
+        assertThat(rows.size()).isEqualTo(2);
+        assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, 
"BBB"));
+    }
+
     @Test
     public void testSkipDedup() {
         batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (1, 'AAA'), (2, 
'BBB'), (3, 'BBB')");
@@ -551,6 +564,7 @@ public class AppendTableITCase extends CatalogITCaseBase {
     protected List<String> ddl() {
         return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) 
WITH ('bucket' = '-1')",
+                "CREATE TABLE IF NOT EXISTS append_table_lineage (id INT, data 
STRING) WITH ('bucket' = '-1', 'row-tracking.enabled' = 'true')",
                 "CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING, 
dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')",
                 "CREATE TABLE IF NOT EXISTS complex_table (id INT, data 
MAP<INT, INT>) WITH ('bucket' = '-1')",
                 "CREATE TABLE IF NOT EXISTS index_table (id INT, indexc 
STRING, data STRING) WITH ('bucket' = '-1', 
'file-index.bloom-filter.columns'='indexc', 
'file-index.bloom-filter.indexc.items' = '500')");
@@ -584,19 +598,40 @@ public class AppendTableITCase extends CatalogITCaseBase {
 
     private void assertExecuteExpected(
             String sql, long expectedSnapshotId, Snapshot.CommitKind 
expectedCommitKind) {
+        assertExecuteExpected(sql, expectedSnapshotId, expectedCommitKind, 
"append_table");
+    }
+
+    private void assertExecuteExpected(
+            String sql,
+            long expectedSnapshotId,
+            Snapshot.CommitKind expectedCommitKind,
+            String tableName) {
         batchSql(sql);
-        Snapshot snapshot = findLatestSnapshot("append_table");
+        Snapshot snapshot = findLatestSnapshot(tableName);
         assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
         assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
     }
 
+    private void assertBatchHasCompact(String sql, long timeout) throws 
Exception {
+        batchSql(sql);
+        waitCompactSnapshot(timeout);
+    }
+
     private void assertStreamingHasCompact(String sql, long timeout) throws 
Exception {
+        sEnv.executeSql(sql);
+        waitCompactSnapshot(timeout);
+    }
+
+    private void waitCompactSnapshot(long timeout) throws Exception {
+        waitCompactSnapshot(timeout, "append_table");
+    }
+
+    private void waitCompactSnapshot(long timeout, String tableName) throws 
Exception {
         long start = System.currentTimeMillis();
         long currentId = 1;
-        sEnv.executeSql(sql);
         Snapshot snapshot;
         while (true) {
-            snapshot = findSnapshot("append_table", currentId);
+            snapshot = findSnapshot(tableName, currentId);
             if (snapshot != null) {
                 if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
                     break;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
index 6675a1358a..c0c3041bf9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
@@ -243,6 +243,7 @@ public class AppendPreCommitCompactCoordinatorOperatorTest {
                 null,
                 null,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
index 4d5e9ac1b9..4f5df2a2d2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
@@ -350,6 +350,7 @@ public class ChangelogCompactCoordinateOperatorTest {
                 null,
                 null,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
index 6b47256a5c..ddd798519a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
@@ -198,6 +198,7 @@ public class ChangelogCompactSortOperatorTest {
                 null,
                 null,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
index 580220e77d..afd57ff29e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
@@ -101,6 +101,7 @@ public class ChangelogCompactTaskSerializerTest {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
index fa567e5490..364cdcb9b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
@@ -82,6 +82,7 @@ public class ChangelogCompactTaskTest {
                                 null,
                                 null,
                                 null,
+                                null,
                                 null)));
         ChangelogCompactTask task =
                 new ChangelogCompactTask(1, BinaryRow.EMPTY_ROW, 1, files, new 
HashMap<>());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index 0c90b54dee..50099be181 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -79,6 +79,7 @@ public class CompactionTaskSimpleSerializerTest {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 92c8dd94a1..1bf93ab818 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -116,6 +116,7 @@ public class FileStoreSourceSplitGeneratorTest {
                             0L, // not used
                             null, // not used
                             FileSource.APPEND,
+                            null,
                             null));
         }
         return DataSplit.builder()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index 9c884cd5c3..f36b57a009 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -89,6 +89,7 @@ public class FileStoreSourceSplitSerializerTest {
                 0L,
                 null,
                 FileSource.APPEND,
+                null,
                 null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 0d44e0241a..e8931fb086 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -146,7 +146,8 @@ public class TestChangelogDataReadWrite {
                         VALUE_TYPE,
                         FileFormatDiscover.of(options),
                         pathFactory,
-                        options.fileIndexReadEnabled());
+                        options.fileIndexReadEnabled(),
+                        false);
         return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
     }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index 4997f65eaf..febf15f9e9 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -54,6 +54,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
             null,
             FileSource.APPEND,
             null,
+            null,
             null)
       }
 
@@ -91,6 +92,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
         null,
         FileSource.APPEND,
         null,
+        null,
         null)
     ).asJava
 

Reply via email to