This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit ade0600b3af71c892438bcce58e3e2833975e1ce
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 16 14:10:16 2025 +0800

    [core] Modify all row lineage to row tracking in codes (#6262)
---
 docs/content/append-table/row-tracking.md          |  8 ++--
 .../org/apache/paimon/table/SpecialFields.java     | 20 ++++----
 .../apache/paimon/casting/FallbackMappingRow.java  |  6 +--
 .../paimon/data/columnar/ColumnarRowIterator.java  |  2 +-
 .../org/apache/paimon/AppendOnlyFileStore.java     |  2 +-
 .../org/apache/paimon/io/DataFileRecordReader.java | 30 ++++++------
 .../org/apache/paimon/io/PojoDataFileMeta.java     |  2 +-
 .../paimon/operation/DataEvolutionSplitRead.java   |  8 ++--
 .../paimon/operation/FileStoreCommitImpl.java      |  4 +-
 .../paimon/operation/MergeFileSplitRead.java       |  2 +-
 .../apache/paimon/operation/RawFileSplitRead.java  |  4 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  4 +-
 ...{RowLineageTable.java => RowTrackingTable.java} | 16 +++----
 .../paimon/table/system/SystemTableLoader.java     |  4 +-
 .../AppendPreCommitCompactWorkerOperator.java      |  2 +-
 .../paimon/flink/compact/AppendTableCompactor.java |  2 +-
 .../org/apache/paimon/flink/AppendTableITCase.java | 56 +++++++++++-----------
 ...{RowLineageTest.scala => RowTrackingTest.scala} |  2 +-
 ...{RowLineageTest.scala => RowTrackingTest.scala} |  2 +-
 ...{RowLineageTest.scala => RowTrackingTest.scala} |  2 +-
 ...{RowLineageTest.scala => RowTrackingTest.scala} |  2 +-
 .../apache/paimon/spark/sql/RowLineageTest.scala   | 21 --------
 .../apache/paimon/spark/sql/RowTrackingTest.scala} |  2 +-
 .../paimon/spark/procedure/CompactProcedure.java   |  2 +-
 .../paimon/spark/ColumnPruningAndPushDown.scala    |  2 +-
 .../org/apache/paimon/spark/SparkTableWrite.scala  |  4 +-
 .../commands/DeleteFromPaimonTableCommand.scala    |  4 +-
 .../spark/commands/MergeIntoPaimonTable.scala      | 32 ++++++-------
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 12 ++---
 .../spark/commands/UpdatePaimonTableCommand.scala  |  2 +-
 .../paimon/spark/schema/PaimonMetadataColumn.scala |  2 +-
 .../apache/paimon/spark/util/ScanPlanHelper.scala  |  6 +--
 ...ageTestBase.scala => RowTrackingTestBase.scala} | 22 ++++-----
 .../paimon/spark/util/ScanPlanHelperTest.scala     | 14 +++---
 34 files changed, 143 insertions(+), 162 deletions(-)

diff --git a/docs/content/append-table/row-tracking.md 
b/docs/content/append-table/row-tracking.md
index 04aec8bc58..fff59f5edf 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -26,9 +26,9 @@ under the License.
 
 # Row tracking
 
-Row tracking allows Paimon to track row-level lineage in a Paimon append 
table. Once enabled on a Paimon table, two more hidden columns will be added to 
the table schema:
-- `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It 
is used to track the lineage of the row and can be used to identify the row in 
case of update, merge into or delete.
-- `_SEQUENCE_NUMBER`: BIGINT, this is field indicates which `version` of this 
record is. It actually is the snapshot-id of the snapshot that this row belongs 
to. It is used to track the lineage of the row version.
+Row tracking allows Paimon to track row-level tracking in a Paimon append 
table. Once enabled on a Paimon table, two more hidden columns will be added to 
the table schema:
+- `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It 
is used to track the update of the row and can be used to identify the row in 
case of update, merge into or delete.
+- `_SEQUENCE_NUMBER`: BIGINT, this is field indicates which `version` of this 
record is. It actually is the snapshot-id of the snapshot that this row belongs 
to. It is used to track the update of the row version.
 
 Hidden columns follows the following rules:
 - Whenever we read from one table with row tracking enabled, the `_ROW_ID` and 
`_SEQUENCE_NUMBER` will be `NOT NULL`.
@@ -57,7 +57,7 @@ CREATE TABLE t (id INT, data STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'tr
 INSERT INTO t VALUES (11, 'a'), (22, 'b')
 ```
 
-You can select the row lineage meta column with the following sql in spark:
+You can select the row tracking meta column with the following sql in spark:
 ```sql
 SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
 ```
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java 
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index 042f1d1d53..d89776747b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -139,33 +139,33 @@ public class SpecialFields {
                 + depth;
     }
 
-    public static RowType rowTypeWithRowLineage(RowType rowType) {
-        return rowTypeWithRowLineage(rowType, false);
+    public static RowType rowTypeWithRowTracking(RowType rowType) {
+        return rowTypeWithRowTracking(rowType, false);
     }
 
     /**
-     * Add row lineage fields to rowType.
+     * Add row tracking fields to rowType.
      *
      * @param sequenceNumberNullable sequence number is not null for user, but 
is nullable when read
      *     and write
      */
-    public static RowType rowTypeWithRowLineage(RowType rowType, boolean 
sequenceNumberNullable) {
-        List<DataField> fieldsWithRowLineage = new 
ArrayList<>(rowType.getFields());
+    public static RowType rowTypeWithRowTracking(RowType rowType, boolean 
sequenceNumberNullable) {
+        List<DataField> fieldsWithRowTracking = new 
ArrayList<>(rowType.getFields());
 
-        fieldsWithRowLineage.forEach(
+        fieldsWithRowTracking.forEach(
                 f -> {
                     if (ROW_ID.name().equals(f.name()) || 
SEQUENCE_NUMBER.name().equals(f.name())) {
                         throw new IllegalArgumentException(
-                                "Row lineage field name '"
+                                "Row tracking field name '"
                                         + f.name()
                                         + "' conflicts with existing field 
names.");
                     }
                 });
-        fieldsWithRowLineage.add(SpecialFields.ROW_ID);
-        fieldsWithRowLineage.add(
+        fieldsWithRowTracking.add(SpecialFields.ROW_ID);
+        fieldsWithRowTracking.add(
                 sequenceNumberNullable
                         ? SpecialFields.SEQUENCE_NUMBER.copy(true)
                         : SpecialFields.SEQUENCE_NUMBER);
-        return new RowType(fieldsWithRowLineage);
+        return new RowType(fieldsWithRowTracking);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
index 364c09d90f..a83aba874c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
@@ -27,7 +27,7 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.types.RowKind;
 
-/** Row with row lineage inject in. */
+/** Row with fallback mapping row inject in. */
 public class FallbackMappingRow implements InternalRow {
 
     private InternalRow main;
@@ -181,9 +181,9 @@ public class FallbackMappingRow implements InternalRow {
         return main.getRow(pos, numFields);
     }
 
-    public FallbackMappingRow replace(InternalRow main, InternalRow 
rowLineage) {
+    public FallbackMappingRow replace(InternalRow main, InternalRow 
fallbackRow) {
         this.main = main;
-        this.fallbackRow = rowLineage;
+        this.fallbackRow = fallbackRow;
         return this;
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 37f8d8edf2..8c641660ba 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -124,7 +124,7 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
         return this;
     }
 
-    public ColumnarRowIterator assignRowLineage(
+    public ColumnarRowIterator assignRowTracking(
             Long firstRowId, Long snapshotId, Map<String, Integer> meta) {
         VectorizedColumnBatch vectorizedColumnBatch = row.batch();
         ColumnVector[] vectors = vectorizedColumnBatch.columns;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 2f552d799b..da40bd20ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -110,7 +110,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
         if (bucketMode() == BucketMode.BUCKET_UNAWARE) {
             RawFileSplitRead readForCompact = newRead();
             if (options.rowTrackingEnabled()) {
-                
readForCompact.withReadType(SpecialFields.rowTypeWithRowLineage(rowType));
+                
readForCompact.withReadType(SpecialFields.rowTypeWithRowTracking(rowType));
             }
             return new AppendFileStoreWrite(
                     fileIO,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index f7b283daf0..8f7c9d6dad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -48,7 +48,7 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
     @Nullable private final int[] indexMapping;
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
-    private final boolean rowLineageEnabled;
+    private final boolean rowTrackingEnabled;
     @Nullable private final Long firstRowId;
     private final long maxSequenceNumber;
     private final Map<String, Integer> systemFields;
@@ -60,7 +60,7 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping,
             @Nullable PartitionInfo partitionInfo,
-            boolean rowLineageEnabled,
+            boolean rowTrackingEnabled,
             @Nullable Long firstRowId,
             long maxSequenceNumber,
             Map<String, Integer> systemFields)
@@ -75,7 +75,7 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
         this.indexMapping = indexMapping;
         this.partitionInfo = partitionInfo;
         this.castMapping = castMapping;
-        this.rowLineageEnabled = rowLineageEnabled;
+        this.rowTrackingEnabled = rowTrackingEnabled;
         this.firstRowId = firstRowId;
         this.maxSequenceNumber = maxSequenceNumber;
         this.systemFields = systemFields;
@@ -91,10 +91,10 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
 
         if (iterator instanceof ColumnarRowIterator) {
             iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, 
indexMapping);
-            if (rowLineageEnabled) {
+            if (rowTrackingEnabled) {
                 iterator =
                         ((ColumnarRowIterator) iterator)
-                                .assignRowLineage(firstRowId, 
maxSequenceNumber, systemFields);
+                                .assignRowTracking(firstRowId, 
maxSequenceNumber, systemFields);
             }
         } else {
             if (partitionInfo != null) {
@@ -108,33 +108,33 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
                 iterator = iterator.transform(projectedRow::replaceRow);
             }
 
-            if (rowLineageEnabled && !systemFields.isEmpty()) {
-                GenericRow lineageRow = new GenericRow(2);
+            if (rowTrackingEnabled && !systemFields.isEmpty()) {
+                GenericRow trackingRow = new GenericRow(2);
 
-                int[] fallbackToLineageMappings = new 
int[tableRowType.getFieldCount()];
-                Arrays.fill(fallbackToLineageMappings, -1);
+                int[] fallbackToTrackingMappings = new 
int[tableRowType.getFieldCount()];
+                Arrays.fill(fallbackToTrackingMappings, -1);
 
                 if (systemFields.containsKey(SpecialFields.ROW_ID.name())) {
-                    
fallbackToLineageMappings[systemFields.get(SpecialFields.ROW_ID.name())] = 0;
+                    
fallbackToTrackingMappings[systemFields.get(SpecialFields.ROW_ID.name())] = 0;
                 }
                 if 
(systemFields.containsKey(SpecialFields.SEQUENCE_NUMBER.name())) {
-                    fallbackToLineageMappings[
+                    fallbackToTrackingMappings[
                                     
systemFields.get(SpecialFields.SEQUENCE_NUMBER.name())] =
                             1;
                 }
 
                 FallbackMappingRow fallbackMappingRow =
-                        new FallbackMappingRow(fallbackToLineageMappings);
+                        new FallbackMappingRow(fallbackToTrackingMappings);
                 final FileRecordIterator<InternalRow> iteratorInner = iterator;
                 iterator =
                         iterator.transform(
                                 row -> {
                                     if (firstRowId != null) {
-                                        lineageRow.setField(
+                                        trackingRow.setField(
                                                 0, 
iteratorInner.returnedPosition() + firstRowId);
                                     }
-                                    lineageRow.setField(1, maxSequenceNumber);
-                                    return fallbackMappingRow.replace(row, 
lineageRow);
+                                    trackingRow.setField(1, maxSequenceNumber);
+                                    return fallbackMappingRow.replace(row, 
trackingRow);
                                 });
             }
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 6e858fbd22..88f1a740ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -50,7 +50,7 @@ public class PojoDataFileMeta implements DataFileMeta {
     private final SimpleStats keyStats;
     private final SimpleStats valueStats;
 
-    // As for row-lineage table, this will be reassigned while committing
+    // As for row-tracking table, this will be reassigned while committing
     private final long minSequenceNumber;
     private final long maxSequenceNumber;
     private final long schemaId;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index c2a6ebd18f..f4c426e86f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -37,6 +37,7 @@ import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataField;
@@ -56,7 +57,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
-import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -124,7 +125,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 new Builder(
                         formatDiscover,
                         readRowType.getFields(),
-                        schema -> 
rowTypeWithRowLineage(schema.logicalRowType(), true).getFields(),
+                        schema -> 
rowTypeWithRowTracking(schema.logicalRowType(), true).getFields(),
                         null,
                         null,
                         null);
@@ -189,7 +190,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             long schemaId = file.schemaId();
             TableSchema dataSchema = 
schemaManager.schema(schemaId).project(file.writeCols());
             int[] fieldIds =
-                    
rowTypeWithRowLineage(dataSchema.logicalRowType()).getFields().stream()
+                    
SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields()
+                            .stream()
                             .mapToInt(DataField::id)
                             .toArray();
             List<DataField> readFields = new ArrayList<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 56fde4e30e..1c859fb693 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1051,7 +1051,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 // assign row id for new files
                 List<ManifestEntry> rowIdAssigned = new ArrayList<>();
                 nextRowIdStart =
-                        assignRowLineageMeta(firstRowIdStart, 
snapshotAssigned, rowIdAssigned);
+                        assignRowTrackingMeta(firstRowIdStart, 
snapshotAssigned, rowIdAssigned);
                 deltaFiles = rowIdAssigned;
             }
 
@@ -1173,7 +1173,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return new SuccessResult();
     }
 
-    private long assignRowLineageMeta(
+    private long assignRowTrackingMeta(
             long firstRowIdStart,
             List<ManifestEntry> deltaFiles,
             List<ManifestEntry> rowIdAssigned) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 08344c08b5..3da14ae7dd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -64,7 +64,7 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.predicate.PredicateBuilder.containsFields;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
 
-/** A {@link SplitRead} to read row lineage table which need field merge. */
+/** A {@link SplitRead} to read row tracking table which need field merge. */
 public class MergeFileSplitRead implements SplitRead<KeyValue> {
 
     private final TableSchema tableSchema;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 610a0c0a2e..5dc8522a8f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -64,7 +64,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
 
 /** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
 public class RawFileSplitRead implements SplitRead<InternalRow> {
@@ -173,7 +173,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                         readRowType.getFields(),
                         schema -> {
                             if (rowTrackingEnabled) {
-                                return 
rowTypeWithRowLineage(schema.logicalRowType(), true)
+                                return 
rowTypeWithRowTracking(schema.logicalRowType(), true)
                                         .getFields();
                             }
                             return schema.fields();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 064630023d..cbf3153dac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -639,11 +639,11 @@ public class SchemaValidation {
         if (rowTrackingEnabled) {
             checkArgument(
                     options.bucket() == -1,
-                    "Cannot define %s for row lineage table, it only support 
bucket = -1",
+                    "Cannot define %s for row tracking table, it only support 
bucket = -1",
                     CoreOptions.BUCKET.key());
             checkArgument(
                     schema.primaryKeys().isEmpty(),
-                    "Cannot define %s for row lineage table.",
+                    "Cannot define %s for row tracking table.",
                     PRIMARY_KEY.key());
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowTrackingTable.java
similarity index 90%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/system/RowTrackingTable.java
index 75e4319bc1..519bf62d18 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowTrackingTable.java
@@ -30,6 +30,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
@@ -48,16 +49,15 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
-import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
 
 /** A {@link Table} for reading row id of table. */
-public class RowLineageTable implements DataTable, ReadonlyTable {
+public class RowTrackingTable implements DataTable, ReadonlyTable {
 
-    public static final String ROW_LINEAGE = "row_lineage";
+    public static final String ROW_TRACKING = "row_tracking";
 
     private final FileStoreTable wrapped;
 
-    public RowLineageTable(FileStoreTable wrapped) {
+    public RowTrackingTable(FileStoreTable wrapped) {
         this.wrapped = wrapped;
 
         if (!coreOptions().rowTrackingEnabled()) {
@@ -96,12 +96,12 @@ public class RowLineageTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public String name() {
-        return wrapped.name() + SYSTEM_TABLE_SPLITTER + ROW_LINEAGE;
+        return wrapped.name() + SYSTEM_TABLE_SPLITTER + ROW_TRACKING;
     }
 
     @Override
     public RowType rowType() {
-        return rowTypeWithRowLineage(wrapped.rowType());
+        return SpecialFields.rowTypeWithRowTracking(wrapped.rowType());
     }
 
     @Override
@@ -176,7 +176,7 @@ public class RowLineageTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public DataTable switchToBranch(String branchName) {
-        return new RowLineageTable(wrapped.switchToBranch(branchName));
+        return new RowTrackingTable(wrapped.switchToBranch(branchName));
     }
 
     @Override
@@ -186,7 +186,7 @@ public class RowLineageTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new RowLineageTable(wrapped.copy(dynamicOptions));
+        return new RowTrackingTable(wrapped.copy(dynamicOptions));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 69f7a65ddf..d79f542f7d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -45,7 +45,7 @@ import static 
org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
 import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
 import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
 import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
-import static org.apache.paimon.table.system.RowLineageTable.ROW_LINEAGE;
+import static org.apache.paimon.table.system.RowTrackingTable.ROW_TRACKING;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
 import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
@@ -73,7 +73,7 @@ public class SystemTableLoader {
                     .put(STATISTICS, StatisticTable::new)
                     .put(BINLOG, BinlogTable::new)
                     .put(TABLE_INDEXES, TableIndexesTable::new)
-                    .put(ROW_LINEAGE, RowLineageTable::new)
+                    .put(ROW_TRACKING, RowTrackingTable::new)
                     .build();
 
     public static final List<String> SYSTEM_TABLES = new 
ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
index 8d8a668f79..0e71f91865 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
@@ -72,7 +72,7 @@ public class AppendPreCommitCompactWorkerOperator extends 
AbstractStreamOperator
             checkArgument(
                     !coreOptions.dataEvolutionEnabled(),
                     "Data evolution enabled table should not invoke compact 
yet.");
-            
this.write.withWriteType(SpecialFields.rowTypeWithRowLineage(table.rowType()));
+            
this.write.withWriteType(SpecialFields.rowTypeWithRowTracking(table.rowType()));
         }
         this.pathFactory = table.store().pathFactory();
         this.fileIO = table.fileIO();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
index 4b85eb5169..1efbccbc19 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
@@ -78,7 +78,7 @@ public class AppendTableCompactor {
         CoreOptions coreOptions = table.coreOptions();
         this.write = (BaseAppendFileStoreWrite) 
table.store().newWrite(commitUser);
         if (coreOptions.rowTrackingEnabled()) {
-            
write.withWriteType(SpecialFields.rowTypeWithRowLineage(table.rowType()));
+            
write.withWriteType(SpecialFields.rowTypeWithRowTracking(table.rowType()));
         }
         this.result = new LinkedList<>();
         this.compactExecutorsupplier = lazyCompactExecutor;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
index 582ced3e10..85ded57f23 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
@@ -95,69 +95,69 @@ public class AppendTableITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testReadWriteWithLineage() {
-        batchSql("INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2, 
'BBB')");
-        List<Row> rows = batchSql("SELECT * FROM 
append_table_lineage$row_lineage");
+    public void testReadWriteWithRowTracking() {
+        batchSql("INSERT INTO append_table_tracking VALUES (1, 'AAA'), (2, 
'BBB')");
+        List<Row> rows = batchSql("SELECT * FROM 
append_table_tracking$row_tracking");
         assertThat(rows.size()).isEqualTo(2);
         assertThat(rows)
                 .containsExactlyInAnyOrder(Row.of(1, "AAA", 0L, 1L), Row.of(2, 
"BBB", 1L, 1L));
 
-        rows = batchSql("SELECT * FROM append_table_lineage");
+        rows = batchSql("SELECT * FROM append_table_tracking");
         assertThat(rows.size()).isEqualTo(2);
         assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, 
"BBB"));
     }
 
     @Test
-    public void testCompactionWithRowLineage() throws Exception {
-        batchSql("ALTER TABLE append_table_lineage SET 
('compaction.max.file-num' = '4')");
+    public void testCompactionWithRowTracking() throws Exception {
+        batchSql("ALTER TABLE append_table_tracking SET 
('compaction.max.file-num' = '4')");
 
         assertExecuteExpected(
-                "INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2, 
'BBB')",
+                "INSERT INTO append_table_tracking VALUES (1, 'AAA'), (2, 
'BBB')",
                 1L,
                 Snapshot.CommitKind.APPEND,
-                "append_table_lineage");
+                "append_table_tracking");
         assertExecuteExpected(
-                "INSERT INTO append_table_lineage VALUES (3, 'CCC'), (4, 
'DDD')",
+                "INSERT INTO append_table_tracking VALUES (3, 'CCC'), (4, 
'DDD')",
                 2L,
                 Snapshot.CommitKind.APPEND,
-                "append_table_lineage");
+                "append_table_tracking");
         assertExecuteExpected(
-                "INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2, 
'BBB'), (3, 'CCC'), (4, 'DDD')",
+                "INSERT INTO append_table_tracking VALUES (1, 'AAA'), (2, 
'BBB'), (3, 'CCC'), (4, 'DDD')",
                 3L,
                 Snapshot.CommitKind.APPEND,
-                "append_table_lineage");
+                "append_table_tracking");
         assertExecuteExpected(
-                "INSERT INTO append_table_lineage VALUES (5, 'EEE'), (6, 
'FFF')",
+                "INSERT INTO append_table_tracking VALUES (5, 'EEE'), (6, 
'FFF')",
                 4L,
                 Snapshot.CommitKind.APPEND,
-                "append_table_lineage");
+                "append_table_tracking");
         assertExecuteExpected(
-                "INSERT INTO append_table_lineage VALUES (7, 'HHH'), (8, 
'III')",
+                "INSERT INTO append_table_tracking VALUES (7, 'HHH'), (8, 
'III')",
                 5L,
                 Snapshot.CommitKind.APPEND,
-                "append_table_lineage");
+                "append_table_tracking");
         assertExecuteExpected(
-                "INSERT INTO append_table_lineage VALUES (9, 'JJJ'), (10, 
'KKK')",
+                "INSERT INTO append_table_tracking VALUES (9, 'JJJ'), (10, 
'KKK')",
                 6L,
                 Snapshot.CommitKind.APPEND,
-                "append_table_lineage");
+                "append_table_tracking");
         assertExecuteExpected(
-                "INSERT INTO append_table_lineage VALUES (11, 'LLL'), (12, 
'MMM')",
+                "INSERT INTO append_table_tracking VALUES (11, 'LLL'), (12, 
'MMM')",
                 7L,
                 Snapshot.CommitKind.APPEND,
-                "append_table_lineage");
+                "append_table_tracking");
         assertExecuteExpected(
-                "INSERT INTO append_table_lineage VALUES (13, 'NNN'), (14, 
'OOO')",
+                "INSERT INTO append_table_tracking VALUES (13, 'NNN'), (14, 
'OOO')",
                 8L,
                 Snapshot.CommitKind.APPEND,
-                "append_table_lineage");
+                "append_table_tracking");
 
-        List<Row> originRowsWithId2 = batchSql("SELECT * FROM 
append_table_lineage$row_lineage");
-        batchSql("call sys.compact('default.append_table_lineage')");
-        waitCompactSnapshot(60000L, "append_table_lineage");
-        List<Row> files = batchSql("SELECT * FROM append_table_lineage$files");
+        List<Row> originRowsWithId2 = batchSql("SELECT * FROM 
append_table_tracking$row_tracking");
+        batchSql("call sys.compact('default.append_table_tracking')");
+        waitCompactSnapshot(60000L, "append_table_tracking");
+        List<Row> files = batchSql("SELECT * FROM 
append_table_tracking$files");
         assertThat(files.size()).isEqualTo(1);
-        List<Row> rowsAfter2 = batchSql("SELECT * FROM 
append_table_lineage$row_lineage");
+        List<Row> rowsAfter2 = batchSql("SELECT * FROM 
append_table_tracking$row_tracking");
         
assertThat(originRowsWithId2).containsExactlyInAnyOrderElementsOf(rowsAfter2);
 
         assertThat(rowsAfter2)
@@ -655,7 +655,7 @@ public class AppendTableITCase extends CatalogITCaseBase {
     protected List<String> ddl() {
         return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) 
WITH ('bucket' = '-1')",
-                "CREATE TABLE IF NOT EXISTS append_table_lineage (id INT, data 
STRING) WITH ('bucket' = '-1', 'row-tracking.enabled' = 'true')",
+                "CREATE TABLE IF NOT EXISTS append_table_tracking (id INT, 
data STRING) WITH ('bucket' = '-1', 'row-tracking.enabled' = 'true')",
                 "CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING, 
dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')",
                 "CREATE TABLE IF NOT EXISTS complex_table (id INT, data 
MAP<INT, INT>) WITH ('bucket' = '-1')",
                 "CREATE TABLE IF NOT EXISTS index_table (id INT, indexc 
STRING, data STRING) WITH ('bucket' = '-1', 
'file-index.bloom-filter.columns'='indexc', 
'file-index.bloom-filter.indexc.items' = '500')");
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
copy from 
paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
copy to 
paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
rename from 
paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
rename to 
paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
rename from 
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
rename to 
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
rename from 
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
rename to 
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
deleted file mode 100644
index f7d911c31c..0000000000
--- 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.sql
-
-class RowLineageTest extends RowLineageTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
rename from 
paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
rename to 
paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index dd36e2f570..3a33fe008b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -424,7 +424,7 @@ public class CompactProcedure extends BaseProcedure {
                                             CoreOptions coreOptions = 
table.coreOptions();
                                             if 
(coreOptions.rowTrackingEnabled()) {
                                                 write.withWriteType(
-                                                        
SpecialFields.rowTypeWithRowLineage(
+                                                        
SpecialFields.rowTypeWithRowTracking(
                                                                 
table.rowType()));
                                             }
                                             AppendCompactTaskSerializer ser =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index ce1fe2e25f..db28b4f4ef 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -40,7 +40,7 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
   lazy val tableRowType: RowType = {
     val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
     if (coreOptions.rowTrackingEnabled()) {
-      SpecialFields.rowTypeWithRowLineage(table.rowType())
+      SpecialFields.rowTypeWithRowTracking(table.rowType())
     } else {
       table.rowType()
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
index 1ac9aab47b..704a6c2d5b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -33,7 +33,7 @@ case class SparkTableWrite(
     writeBuilder: BatchWriteBuilder,
     writeType: RowType,
     rowKindColIdx: Int = -1,
-    writeRowLineage: Boolean = false)
+    writeRowTracking: Boolean = false)
   extends SparkTableWriteTrait {
 
   private val ioManager: IOManager = SparkUtils.createIOManager
@@ -41,7 +41,7 @@ case class SparkTableWrite(
   private val write: BatchTableWrite = {
     val _write = writeBuilder.newWrite()
     _write.withIOManager(ioManager)
-    if (writeRowLineage) {
+    if (writeRowTracking) {
       _write.withWriteType(writeType)
     }
     _write
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 6080cd0378..c85315d876 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -147,11 +147,11 @@ case class DeleteFromPaimonTableCommand(
       val toRewriteScanRelation = Filter(Not(condition), newRelation)
       var data = createDataset(sparkSession, toRewriteScanRelation)
       if (coreOptions.rowTrackingEnabled()) {
-        data = selectWithRowLineage(data)
+        data = selectWithRowTracking(data)
       }
 
       // only write new files, should have no compaction
-      val addCommitMessage = writer.writeOnly().withRowLineage().write(data)
+      val addCommitMessage = writer.writeOnly().withRowTracking().write(data)
 
       // Step5: convert the deleted files that need to be written to commit 
message.
       val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 3e8d4deabe..5f12d1110d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -164,32 +164,32 @@ case class MergeIntoPaimonTable(
       val (_, filesToReadScan) =
         extractFilesAndCreateNewScan(filePathsToRead.toArray, 
dataFilePathToMeta, relation)
 
-      // If no files need to be rewritten, no need to write row lineage
-      val writeRowLineage = coreOptions.rowTrackingEnabled() && 
filesToRewritten.nonEmpty
+      // If no files need to be rewritten, no need to write row tracking
+      val writeRowTracking = coreOptions.rowTrackingEnabled() && 
filesToRewritten.nonEmpty
 
       // Add FILE_TOUCHED_COL to mark the row as coming from the touched file, 
if the row has not been
       // modified and was from touched file, it should be kept too.
       var filesToRewrittenDS =
         createDataset(sparkSession, 
filesToRewrittenScan).withColumn(FILE_TOUCHED_COL, lit(true))
-      if (writeRowLineage) {
-        filesToRewrittenDS = selectWithRowLineage(filesToRewrittenDS)
+      if (writeRowTracking) {
+        filesToRewrittenDS = selectWithRowTracking(filesToRewrittenDS)
       }
 
       var filesToReadDS =
         createDataset(sparkSession, 
filesToReadScan).withColumn(FILE_TOUCHED_COL, lit(false))
-      if (writeRowLineage) {
-        // For filesToReadScan we don't need to read row lineage meta cols, 
just add placeholders
-        ROW_LINEAGE_META_COLUMNS.foreach(
+      if (writeRowTracking) {
+        // For filesToReadScan we don't need to read row tracking meta cols, 
just add placeholders
+        ROW_TRACKING_META_COLUMNS.foreach(
           c => filesToReadDS = filesToReadDS.withColumn(c, lit(null)))
       }
 
       val toWriteDS = constructChangedRows(
         sparkSession,
         filesToRewrittenDS.union(filesToReadDS),
-        writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
+        writeRowTracking = writeRowTracking).drop(ROW_KIND_COL)
 
-      val finalWriter = if (writeRowLineage) {
-        writer.withRowLineage()
+      val finalWriter = if (writeRowTracking) {
+        writer.withRowTracking()
       } else {
         writer
       }
@@ -207,7 +207,7 @@ case class MergeIntoPaimonTable(
       remainDeletedRow: Boolean = false,
       deletionVectorEnabled: Boolean = false,
       extraMetadataCols: Seq[PaimonMetadataColumn] = Seq.empty,
-      writeRowLineage: Boolean = false): Dataset[Row] = {
+      writeRowTracking: Boolean = false): Dataset[Row] = {
     val targetDS = targetDataset
       .withColumn(TARGET_ROW_COL, lit(true))
 
@@ -233,7 +233,7 @@ case class MergeIntoPaimonTable(
     def attribute(name: String) = joinedPlan.output.find(attr => 
resolver(name, attr.name))
     val extraMetadataAttributes =
       extraMetadataCols.flatMap(metadataCol => attribute(metadataCol.name))
-    val (rowIdAttr, sequenceNumberAttr) = if (writeRowLineage) {
+    val (rowIdAttr, sequenceNumberAttr) = if (writeRowTracking) {
       (
         attribute(SpecialFields.ROW_ID.name()).get,
         attribute(SpecialFields.SEQUENCE_NUMBER.name()).get)
@@ -241,7 +241,7 @@ case class MergeIntoPaimonTable(
       (null, null)
     }
 
-    val targetOutput = if (writeRowLineage) {
+    val targetOutput = if (writeRowTracking) {
       filteredTargetPlan.output ++ Seq(rowIdAttr, sequenceNumberAttr)
     } else {
       filteredTargetPlan.output
@@ -253,7 +253,7 @@ case class MergeIntoPaimonTable(
       val columnExprs = actions.map {
         case UpdateAction(_, assignments) =>
           var exprs = assignments.map(_.value)
-          if (writeRowLineage) {
+          if (writeRowTracking) {
             exprs ++= Seq(rowIdAttr, Literal(null))
           }
           exprs :+ Literal(RowKind.UPDATE_AFTER.toByteValue)
@@ -267,7 +267,7 @@ case class MergeIntoPaimonTable(
           }
         case InsertAction(_, assignments) =>
           var exprs = assignments.map(_.value)
-          if (writeRowLineage) {
+          if (writeRowTracking) {
             exprs ++= Seq(rowIdAttr, sequenceNumberAttr)
           }
           exprs :+ Literal(RowKind.INSERT.toByteValue)
@@ -280,7 +280,7 @@ case class MergeIntoPaimonTable(
     val notMatchedBySourceOutputs = 
processMergeActions(notMatchedBySourceActions)
     val notMatchedOutputs = processMergeActions(notMatchedActions)
     val outputFields = mutable.ArrayBuffer(targetTable.schema.fields: _*)
-    if (writeRowLineage) {
+    if (writeRowTracking) {
       outputFields += PaimonMetadataColumn.ROW_ID.toStructField
       outputFields += PaimonMetadataColumn.SEQUENCE_NUMBER.toStructField
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index a440ad353a..344cd0879b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -51,7 +51,7 @@ import java.util.Collections.singletonMap
 
 import scala.collection.JavaConverters._
 
-case class PaimonSparkWriter(table: FileStoreTable, writeRowLineage: Boolean = 
false)
+case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean 
= false)
   extends WriteHelper {
 
   private lazy val tableSchema = table.schema
@@ -61,8 +61,8 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowLineage: Boolean = f
   @transient private lazy val serializer = new CommitMessageSerializer
 
   private val writeType = {
-    if (writeRowLineage) {
-      SpecialFields.rowTypeWithRowLineage(table.rowType(), true)
+    if (writeRowTracking) {
+      SpecialFields.rowTypeWithRowTracking(table.rowType(), true)
     } else {
       table.rowType()
     }
@@ -74,9 +74,9 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowLineage: Boolean = f
     PaimonSparkWriter(table.copy(singletonMap(WRITE_ONLY.key(), "true")))
   }
 
-  def withRowLineage(): PaimonSparkWriter = {
+  def withRowTracking(): PaimonSparkWriter = {
     if (coreOptions.rowTrackingEnabled()) {
-      PaimonSparkWriter(table, writeRowLineage = true)
+      PaimonSparkWriter(table, writeRowTracking = true)
     } else {
       this
     }
@@ -98,7 +98,7 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowLineage: Boolean = f
     val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema, 
BUCKET_COL)
     val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
 
-    def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx, 
writeRowLineage)
+    def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx, 
writeRowTracking)
 
     def sparkParallelism = {
       val defaultParallelism = sparkSession.sparkContext.defaultParallelism
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index e304480bae..1babd7b5c3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -161,7 +161,7 @@ case class UpdatePaimonTableCommand(
     }
 
     val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
-    writer.withRowLineage().write(data)
+    writer.withRowTracking().write(data)
   }
 
   private def optimizedIf(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 0acb25e58b..fa71c7530f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -53,7 +53,7 @@ object PaimonMetadataColumn {
   val SEQUENCE_NUMBER_COLUMN: String = SpecialFields.SEQUENCE_NUMBER.name()
 
   val DV_META_COLUMNS: Seq[String] = Seq(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
-  val ROW_LINEAGE_META_COLUMNS: Seq[String] = Seq(ROW_ID_COLUMN, 
SEQUENCE_NUMBER_COLUMN)
+  val ROW_TRACKING_META_COLUMNS: Seq[String] = Seq(ROW_ID_COLUMN, 
SEQUENCE_NUMBER_COLUMN)
 
   val SUPPORTED_METADATA_COLUMNS: Seq[String] = Seq(
     ROW_INDEX_COLUMN,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
index 8b2964cd55..1799df48fe 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.util
 
 import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.schema.PaimonMetadataColumn.{DV_META_COLUMNS, 
ROW_LINEAGE_META_COLUMNS}
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{DV_META_COLUMNS, 
ROW_TRACKING_META_COLUMNS}
 import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
 import org.apache.paimon.table.source.DataSplit
 
@@ -60,8 +60,8 @@ trait ScanPlanHelper extends SQLConfHelper {
     selectWithAdditionalCols(data, DV_META_COLUMNS)
   }
 
-  def selectWithRowLineage(data: DataFrame): DataFrame = {
-    selectWithAdditionalCols(data, ROW_LINEAGE_META_COLUMNS)
+  def selectWithRowTracking(data: DataFrame): DataFrame = {
+    selectWithAdditionalCols(data, ROW_TRACKING_META_COLUMNS)
   }
 
   private def selectWithAdditionalCols(data: DataFrame, additionalCols: 
Seq[String]): DataFrame = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
similarity index 95%
rename from 
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
rename to 
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index d8a3516a80..c8a54d3342 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -23,9 +23,9 @@ import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.sql.Row
 
-abstract class RowLineageTestBase extends PaimonSparkTestBase {
+abstract class RowTrackingTestBase extends PaimonSparkTestBase {
 
-  test("Row Lineage: read row lineage") {
+  test("Row Tracking: read row Tracking") {
     withTable("t") {
       sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
       sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
@@ -41,7 +41,7 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase 
{
     }
   }
 
-  test("Row Lineage: compact table") {
+  test("Row Tracking: compact table") {
     withTable("t") {
       sql(
         "CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'compaction.min.file-num'='2')")
@@ -64,7 +64,7 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase 
{
     }
   }
 
-  test("Row Lineage: delete table") {
+  test("Row Tracking: delete table") {
     withTable("t") {
       sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
 
@@ -77,7 +77,7 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase 
{
     }
   }
 
-  test("Row Lineage: update table") {
+  test("Row Tracking: update table") {
     withTable("t") {
       sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
 
@@ -95,7 +95,7 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase 
{
     }
   }
 
-  test("Row Lineage: update table without condition") {
+  test("Row Tracking: update table without condition") {
     withTable("t") {
       sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
 
@@ -109,7 +109,7 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
     }
   }
 
-  test("Row Lineage: merge into table") {
+  test("Row Tracking: merge into table") {
     withTable("s", "t") {
       sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
       sql("INSERT INTO s VALUES (1, 11), (2, 22)")
@@ -135,7 +135,7 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
     }
   }
 
-  test("Row Lineage: merge into table with only insert") {
+  test("Row Tracking: merge into table with only insert") {
     withTable("s", "t") {
       sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
       sql("INSERT INTO s VALUES (1, 11), (2, 22)")
@@ -156,7 +156,7 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
     }
   }
 
-  test("Row Lineage: merge into table with only delete") {
+  test("Row Tracking: merge into table with only delete") {
     withTable("s", "t") {
       sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
       sql("INSERT INTO s VALUES (1, 11), (2, 22)")
@@ -177,7 +177,7 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
     }
   }
 
-  test("Row Lineage: merge into table with only update") {
+  test("Row Tracking: merge into table with only update") {
     withTable("s", "t") {
       sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
       sql("INSERT INTO s VALUES (1, 11), (2, 22)")
@@ -342,7 +342,7 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
     }
   }
 
-  test("Row Lineage: merge into table not matched by source") {
+  test("Row Tracking: merge into table not matched by source") {
     if (gteqSpark3_4) {
       withTable("source", "target") {
         sql(
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
index dcbf12e273..df65a50095 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.Row
 
 class ScanPlanHelperTest extends PaimonSparkTestBase with ScanPlanHelper {
 
-  test("ScanPlanHelper: create new scan plan and select with row lineage meta 
cols") {
+  test("ScanPlanHelper: create new scan plan and select with row tracking meta 
cols") {
     withTable("t") {
       sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
       sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
@@ -38,17 +38,17 @@ class ScanPlanHelperTest extends PaimonSparkTestBase with 
ScanPlanHelper {
       // select original df should not contain meta cols
       checkAnswer(newDf, Seq(Row(11, "a"), Row(22, "b")))
 
-      // select df with row lineage meta cols
-      checkAnswer(selectWithRowLineage(newDf), Seq(Row(11, "a", 0, 1), Row(22, 
"b", 1, 1)))
+      // select df with row tracking meta cols
+      checkAnswer(selectWithRowTracking(newDf), Seq(Row(11, "a", 0, 1), 
Row(22, "b", 1, 1)))
 
-      // select with row lineage meta cols twice should not add new more meta 
cols
+      // select with row tracking meta cols twice should not add new more meta 
cols
       checkAnswer(
-        selectWithRowLineage(selectWithRowLineage(newDf)),
+        selectWithRowTracking(selectWithRowTracking(newDf)),
         Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1)))
 
-      // select df already contains meta cols with row lineage
+      // select df already contains meta cols with row tracking
       checkAnswer(
-        selectWithRowLineage(newDf.select("_ROW_ID", "id")),
+        selectWithRowTracking(newDf.select("_ROW_ID", "id")),
         Seq(Row(0, 11, 1), Row(1, 22, 1)))
     }
   }

Reply via email to