This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4e947969d0 [core] Modify all row lineage to row tracking in codes
(#6262)
4e947969d0 is described below
commit 4e947969d0b105fd8720e0ce958e5acc6e0e453c
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 16 14:10:16 2025 +0800
[core] Modify all row lineage to row tracking in codes (#6262)
---
docs/content/append-table/row-tracking.md | 8 ++--
.../org/apache/paimon/table/SpecialFields.java | 20 ++++----
.../apache/paimon/casting/FallbackMappingRow.java | 6 +--
.../paimon/data/columnar/ColumnarRowIterator.java | 2 +-
.../org/apache/paimon/AppendOnlyFileStore.java | 2 +-
.../org/apache/paimon/io/DataFileRecordReader.java | 30 ++++++------
.../org/apache/paimon/io/PojoDataFileMeta.java | 2 +-
.../paimon/operation/DataEvolutionSplitRead.java | 8 ++--
.../paimon/operation/FileStoreCommitImpl.java | 4 +-
.../paimon/operation/MergeFileSplitRead.java | 2 +-
.../apache/paimon/operation/RawFileSplitRead.java | 4 +-
.../org/apache/paimon/schema/SchemaValidation.java | 4 +-
...{RowLineageTable.java => RowTrackingTable.java} | 16 +++----
.../paimon/table/system/SystemTableLoader.java | 4 +-
.../AppendPreCommitCompactWorkerOperator.java | 2 +-
.../paimon/flink/compact/AppendTableCompactor.java | 2 +-
.../org/apache/paimon/flink/AppendTableITCase.java | 56 +++++++++++-----------
...{RowLineageTest.scala => RowTrackingTest.scala} | 2 +-
...{RowLineageTest.scala => RowTrackingTest.scala} | 2 +-
...{RowLineageTest.scala => RowTrackingTest.scala} | 2 +-
...{RowLineageTest.scala => RowTrackingTest.scala} | 2 +-
.../apache/paimon/spark/sql/RowLineageTest.scala | 21 --------
.../apache/paimon/spark/sql/RowTrackingTest.scala} | 2 +-
.../paimon/spark/procedure/CompactProcedure.java | 2 +-
.../paimon/spark/ColumnPruningAndPushDown.scala | 2 +-
.../org/apache/paimon/spark/SparkTableWrite.scala | 4 +-
.../commands/DeleteFromPaimonTableCommand.scala | 4 +-
.../spark/commands/MergeIntoPaimonTable.scala | 32 ++++++-------
.../paimon/spark/commands/PaimonSparkWriter.scala | 12 ++---
.../spark/commands/UpdatePaimonTableCommand.scala | 2 +-
.../paimon/spark/schema/PaimonMetadataColumn.scala | 2 +-
.../apache/paimon/spark/util/ScanPlanHelper.scala | 6 +--
...ageTestBase.scala => RowTrackingTestBase.scala} | 22 ++++-----
.../paimon/spark/util/ScanPlanHelperTest.scala | 14 +++---
34 files changed, 143 insertions(+), 162 deletions(-)
diff --git a/docs/content/append-table/row-tracking.md
b/docs/content/append-table/row-tracking.md
index 04aec8bc58..fff59f5edf 100644
--- a/docs/content/append-table/row-tracking.md
+++ b/docs/content/append-table/row-tracking.md
@@ -26,9 +26,9 @@ under the License.
# Row tracking
-Row tracking allows Paimon to track row-level lineage in a Paimon append
table. Once enabled on a Paimon table, two more hidden columns will be added to
the table schema:
-- `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It
is used to track the lineage of the row and can be used to identify the row in
case of update, merge into or delete.
-- `_SEQUENCE_NUMBER`: BIGINT, this is field indicates which `version` of this
record is. It actually is the snapshot-id of the snapshot that this row belongs
to. It is used to track the lineage of the row version.
+Row tracking allows Paimon to track row-level tracking in a Paimon append
table. Once enabled on a Paimon table, two more hidden columns will be added to
the table schema:
+- `_ROW_ID`: BIGINT, this is a unique identifier for each row in the table. It
is used to track the update of the row and can be used to identify the row in
case of update, merge into or delete.
+- `_SEQUENCE_NUMBER`: BIGINT, this is field indicates which `version` of this
record is. It actually is the snapshot-id of the snapshot that this row belongs
to. It is used to track the update of the row version.
Hidden columns follows the following rules:
- Whenever we read from one table with row tracking enabled, the `_ROW_ID` and
`_SEQUENCE_NUMBER` will be `NOT NULL`.
@@ -57,7 +57,7 @@ CREATE TABLE t (id INT, data STRING) TBLPROPERTIES
('row-tracking.enabled' = 'tr
INSERT INTO t VALUES (11, 'a'), (22, 'b')
```
-You can select the row lineage meta column with the following sql in spark:
+You can select the row tracking meta column with the following sql in spark:
```sql
SELECT id, data, _ROW_ID, _SEQUENCE_NUMBER FROM t;
```
diff --git
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index 042f1d1d53..d89776747b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -139,33 +139,33 @@ public class SpecialFields {
+ depth;
}
- public static RowType rowTypeWithRowLineage(RowType rowType) {
- return rowTypeWithRowLineage(rowType, false);
+ public static RowType rowTypeWithRowTracking(RowType rowType) {
+ return rowTypeWithRowTracking(rowType, false);
}
/**
- * Add row lineage fields to rowType.
+ * Add row tracking fields to rowType.
*
* @param sequenceNumberNullable sequence number is not null for user, but
is nullable when read
* and write
*/
- public static RowType rowTypeWithRowLineage(RowType rowType, boolean
sequenceNumberNullable) {
- List<DataField> fieldsWithRowLineage = new
ArrayList<>(rowType.getFields());
+ public static RowType rowTypeWithRowTracking(RowType rowType, boolean
sequenceNumberNullable) {
+ List<DataField> fieldsWithRowTracking = new
ArrayList<>(rowType.getFields());
- fieldsWithRowLineage.forEach(
+ fieldsWithRowTracking.forEach(
f -> {
if (ROW_ID.name().equals(f.name()) ||
SEQUENCE_NUMBER.name().equals(f.name())) {
throw new IllegalArgumentException(
- "Row lineage field name '"
+ "Row tracking field name '"
+ f.name()
+ "' conflicts with existing field
names.");
}
});
- fieldsWithRowLineage.add(SpecialFields.ROW_ID);
- fieldsWithRowLineage.add(
+ fieldsWithRowTracking.add(SpecialFields.ROW_ID);
+ fieldsWithRowTracking.add(
sequenceNumberNullable
? SpecialFields.SEQUENCE_NUMBER.copy(true)
: SpecialFields.SEQUENCE_NUMBER);
- return new RowType(fieldsWithRowLineage);
+ return new RowType(fieldsWithRowTracking);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
index 364c09d90f..a83aba874c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
+++
b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
@@ -27,7 +27,7 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
-/** Row with row lineage inject in. */
+/** Row with fallback mapping row inject in. */
public class FallbackMappingRow implements InternalRow {
private InternalRow main;
@@ -181,9 +181,9 @@ public class FallbackMappingRow implements InternalRow {
return main.getRow(pos, numFields);
}
- public FallbackMappingRow replace(InternalRow main, InternalRow
rowLineage) {
+ public FallbackMappingRow replace(InternalRow main, InternalRow
fallbackRow) {
this.main = main;
- this.fallbackRow = rowLineage;
+ this.fallbackRow = fallbackRow;
return this;
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 37f8d8edf2..8c641660ba 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -124,7 +124,7 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
return this;
}
- public ColumnarRowIterator assignRowLineage(
+ public ColumnarRowIterator assignRowTracking(
Long firstRowId, Long snapshotId, Map<String, Integer> meta) {
VectorizedColumnBatch vectorizedColumnBatch = row.batch();
ColumnVector[] vectors = vectorizedColumnBatch.columns;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 2f552d799b..da40bd20ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -110,7 +110,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
if (bucketMode() == BucketMode.BUCKET_UNAWARE) {
RawFileSplitRead readForCompact = newRead();
if (options.rowTrackingEnabled()) {
-
readForCompact.withReadType(SpecialFields.rowTypeWithRowLineage(rowType));
+
readForCompact.withReadType(SpecialFields.rowTypeWithRowTracking(rowType));
}
return new AppendFileStoreWrite(
fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index f7b283daf0..8f7c9d6dad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -48,7 +48,7 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
- private final boolean rowLineageEnabled;
+ private final boolean rowTrackingEnabled;
@Nullable private final Long firstRowId;
private final long maxSequenceNumber;
private final Map<String, Integer> systemFields;
@@ -60,7 +60,7 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo,
- boolean rowLineageEnabled,
+ boolean rowTrackingEnabled,
@Nullable Long firstRowId,
long maxSequenceNumber,
Map<String, Integer> systemFields)
@@ -75,7 +75,7 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
- this.rowLineageEnabled = rowLineageEnabled;
+ this.rowTrackingEnabled = rowTrackingEnabled;
this.firstRowId = firstRowId;
this.maxSequenceNumber = maxSequenceNumber;
this.systemFields = systemFields;
@@ -91,10 +91,10 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
if (iterator instanceof ColumnarRowIterator) {
iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo,
indexMapping);
- if (rowLineageEnabled) {
+ if (rowTrackingEnabled) {
iterator =
((ColumnarRowIterator) iterator)
- .assignRowLineage(firstRowId,
maxSequenceNumber, systemFields);
+ .assignRowTracking(firstRowId,
maxSequenceNumber, systemFields);
}
} else {
if (partitionInfo != null) {
@@ -108,33 +108,33 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
iterator = iterator.transform(projectedRow::replaceRow);
}
- if (rowLineageEnabled && !systemFields.isEmpty()) {
- GenericRow lineageRow = new GenericRow(2);
+ if (rowTrackingEnabled && !systemFields.isEmpty()) {
+ GenericRow trackingRow = new GenericRow(2);
- int[] fallbackToLineageMappings = new
int[tableRowType.getFieldCount()];
- Arrays.fill(fallbackToLineageMappings, -1);
+ int[] fallbackToTrackingMappings = new
int[tableRowType.getFieldCount()];
+ Arrays.fill(fallbackToTrackingMappings, -1);
if (systemFields.containsKey(SpecialFields.ROW_ID.name())) {
-
fallbackToLineageMappings[systemFields.get(SpecialFields.ROW_ID.name())] = 0;
+
fallbackToTrackingMappings[systemFields.get(SpecialFields.ROW_ID.name())] = 0;
}
if
(systemFields.containsKey(SpecialFields.SEQUENCE_NUMBER.name())) {
- fallbackToLineageMappings[
+ fallbackToTrackingMappings[
systemFields.get(SpecialFields.SEQUENCE_NUMBER.name())] =
1;
}
FallbackMappingRow fallbackMappingRow =
- new FallbackMappingRow(fallbackToLineageMappings);
+ new FallbackMappingRow(fallbackToTrackingMappings);
final FileRecordIterator<InternalRow> iteratorInner = iterator;
iterator =
iterator.transform(
row -> {
if (firstRowId != null) {
- lineageRow.setField(
+ trackingRow.setField(
0,
iteratorInner.returnedPosition() + firstRowId);
}
- lineageRow.setField(1, maxSequenceNumber);
- return fallbackMappingRow.replace(row,
lineageRow);
+ trackingRow.setField(1, maxSequenceNumber);
+ return fallbackMappingRow.replace(row,
trackingRow);
});
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 6e858fbd22..88f1a740ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -50,7 +50,7 @@ public class PojoDataFileMeta implements DataFileMeta {
private final SimpleStats keyStats;
private final SimpleStats valueStats;
- // As for row-lineage table, this will be reassigned while committing
+ // As for row-tracking table, this will be reassigned while committing
private final long minSequenceNumber;
private final long maxSequenceNumber;
private final long schemaId;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index c2a6ebd18f..f4c426e86f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -37,6 +37,7 @@ import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
@@ -56,7 +57,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static java.lang.String.format;
-import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -124,7 +125,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
new Builder(
formatDiscover,
readRowType.getFields(),
- schema ->
rowTypeWithRowLineage(schema.logicalRowType(), true).getFields(),
+ schema ->
rowTypeWithRowTracking(schema.logicalRowType(), true).getFields(),
null,
null,
null);
@@ -189,7 +190,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
long schemaId = file.schemaId();
TableSchema dataSchema =
schemaManager.schema(schemaId).project(file.writeCols());
int[] fieldIds =
-
rowTypeWithRowLineage(dataSchema.logicalRowType()).getFields().stream()
+
SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields()
+ .stream()
.mapToInt(DataField::id)
.toArray();
List<DataField> readFields = new ArrayList<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 56fde4e30e..1c859fb693 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1051,7 +1051,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// assign row id for new files
List<ManifestEntry> rowIdAssigned = new ArrayList<>();
nextRowIdStart =
- assignRowLineageMeta(firstRowIdStart,
snapshotAssigned, rowIdAssigned);
+ assignRowTrackingMeta(firstRowIdStart,
snapshotAssigned, rowIdAssigned);
deltaFiles = rowIdAssigned;
}
@@ -1173,7 +1173,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return new SuccessResult();
}
- private long assignRowLineageMeta(
+ private long assignRowTrackingMeta(
long firstRowIdStart,
List<ManifestEntry> deltaFiles,
List<ManifestEntry> rowIdAssigned) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 08344c08b5..3da14ae7dd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -64,7 +64,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.predicate.PredicateBuilder.containsFields;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-/** A {@link SplitRead} to read row lineage table which need field merge. */
+/** A {@link SplitRead} to read row tracking table which need field merge. */
public class MergeFileSplitRead implements SplitRead<KeyValue> {
private final TableSchema tableSchema;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 610a0c0a2e..5dc8522a8f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -64,7 +64,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
public class RawFileSplitRead implements SplitRead<InternalRow> {
@@ -173,7 +173,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
readRowType.getFields(),
schema -> {
if (rowTrackingEnabled) {
- return
rowTypeWithRowLineage(schema.logicalRowType(), true)
+ return
rowTypeWithRowTracking(schema.logicalRowType(), true)
.getFields();
}
return schema.fields();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 064630023d..cbf3153dac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -639,11 +639,11 @@ public class SchemaValidation {
if (rowTrackingEnabled) {
checkArgument(
options.bucket() == -1,
- "Cannot define %s for row lineage table, it only support
bucket = -1",
+ "Cannot define %s for row tracking table, it only support
bucket = -1",
CoreOptions.BUCKET.key());
checkArgument(
schema.primaryKeys().isEmpty(),
- "Cannot define %s for row lineage table.",
+ "Cannot define %s for row tracking table.",
PRIMARY_KEY.key());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowTrackingTable.java
similarity index 90%
rename from
paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/system/RowTrackingTable.java
index 75e4319bc1..519bf62d18 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowTrackingTable.java
@@ -30,6 +30,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
@@ -48,16 +49,15 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
-import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
/** A {@link Table} for reading row id of table. */
-public class RowLineageTable implements DataTable, ReadonlyTable {
+public class RowTrackingTable implements DataTable, ReadonlyTable {
- public static final String ROW_LINEAGE = "row_lineage";
+ public static final String ROW_TRACKING = "row_tracking";
private final FileStoreTable wrapped;
- public RowLineageTable(FileStoreTable wrapped) {
+ public RowTrackingTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
if (!coreOptions().rowTrackingEnabled()) {
@@ -96,12 +96,12 @@ public class RowLineageTable implements DataTable,
ReadonlyTable {
@Override
public String name() {
- return wrapped.name() + SYSTEM_TABLE_SPLITTER + ROW_LINEAGE;
+ return wrapped.name() + SYSTEM_TABLE_SPLITTER + ROW_TRACKING;
}
@Override
public RowType rowType() {
- return rowTypeWithRowLineage(wrapped.rowType());
+ return SpecialFields.rowTypeWithRowTracking(wrapped.rowType());
}
@Override
@@ -176,7 +176,7 @@ public class RowLineageTable implements DataTable,
ReadonlyTable {
@Override
public DataTable switchToBranch(String branchName) {
- return new RowLineageTable(wrapped.switchToBranch(branchName));
+ return new RowTrackingTable(wrapped.switchToBranch(branchName));
}
@Override
@@ -186,7 +186,7 @@ public class RowLineageTable implements DataTable,
ReadonlyTable {
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new RowLineageTable(wrapped.copy(dynamicOptions));
+ return new RowTrackingTable(wrapped.copy(dynamicOptions));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 69f7a65ddf..d79f542f7d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -45,7 +45,7 @@ import static
org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
-import static org.apache.paimon.table.system.RowLineageTable.ROW_LINEAGE;
+import static org.apache.paimon.table.system.RowTrackingTable.ROW_TRACKING;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
@@ -73,7 +73,7 @@ public class SystemTableLoader {
.put(STATISTICS, StatisticTable::new)
.put(BINLOG, BinlogTable::new)
.put(TABLE_INDEXES, TableIndexesTable::new)
- .put(ROW_LINEAGE, RowLineageTable::new)
+ .put(ROW_TRACKING, RowTrackingTable::new)
.build();
public static final List<String> SYSTEM_TABLES = new
ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
index 8d8a668f79..0e71f91865 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
@@ -72,7 +72,7 @@ public class AppendPreCommitCompactWorkerOperator extends
AbstractStreamOperator
checkArgument(
!coreOptions.dataEvolutionEnabled(),
"Data evolution enabled table should not invoke compact
yet.");
-
this.write.withWriteType(SpecialFields.rowTypeWithRowLineage(table.rowType()));
+
this.write.withWriteType(SpecialFields.rowTypeWithRowTracking(table.rowType()));
}
this.pathFactory = table.store().pathFactory();
this.fileIO = table.fileIO();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
index 4b85eb5169..1efbccbc19 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
@@ -78,7 +78,7 @@ public class AppendTableCompactor {
CoreOptions coreOptions = table.coreOptions();
this.write = (BaseAppendFileStoreWrite)
table.store().newWrite(commitUser);
if (coreOptions.rowTrackingEnabled()) {
-
write.withWriteType(SpecialFields.rowTypeWithRowLineage(table.rowType()));
+
write.withWriteType(SpecialFields.rowTypeWithRowTracking(table.rowType()));
}
this.result = new LinkedList<>();
this.compactExecutorsupplier = lazyCompactExecutor;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
index 582ced3e10..85ded57f23 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
@@ -95,69 +95,69 @@ public class AppendTableITCase extends CatalogITCaseBase {
}
@Test
- public void testReadWriteWithLineage() {
- batchSql("INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2,
'BBB')");
- List<Row> rows = batchSql("SELECT * FROM
append_table_lineage$row_lineage");
+ public void testReadWriteWithRowTracking() {
+ batchSql("INSERT INTO append_table_tracking VALUES (1, 'AAA'), (2,
'BBB')");
+ List<Row> rows = batchSql("SELECT * FROM
append_table_tracking$row_tracking");
assertThat(rows.size()).isEqualTo(2);
assertThat(rows)
.containsExactlyInAnyOrder(Row.of(1, "AAA", 0L, 1L), Row.of(2,
"BBB", 1L, 1L));
- rows = batchSql("SELECT * FROM append_table_lineage");
+ rows = batchSql("SELECT * FROM append_table_tracking");
assertThat(rows.size()).isEqualTo(2);
assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2,
"BBB"));
}
@Test
- public void testCompactionWithRowLineage() throws Exception {
- batchSql("ALTER TABLE append_table_lineage SET
('compaction.max.file-num' = '4')");
+ public void testCompactionWithRowTracking() throws Exception {
+ batchSql("ALTER TABLE append_table_tracking SET
('compaction.max.file-num' = '4')");
assertExecuteExpected(
- "INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2,
'BBB')",
+ "INSERT INTO append_table_tracking VALUES (1, 'AAA'), (2,
'BBB')",
1L,
Snapshot.CommitKind.APPEND,
- "append_table_lineage");
+ "append_table_tracking");
assertExecuteExpected(
- "INSERT INTO append_table_lineage VALUES (3, 'CCC'), (4,
'DDD')",
+ "INSERT INTO append_table_tracking VALUES (3, 'CCC'), (4,
'DDD')",
2L,
Snapshot.CommitKind.APPEND,
- "append_table_lineage");
+ "append_table_tracking");
assertExecuteExpected(
- "INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2,
'BBB'), (3, 'CCC'), (4, 'DDD')",
+ "INSERT INTO append_table_tracking VALUES (1, 'AAA'), (2,
'BBB'), (3, 'CCC'), (4, 'DDD')",
3L,
Snapshot.CommitKind.APPEND,
- "append_table_lineage");
+ "append_table_tracking");
assertExecuteExpected(
- "INSERT INTO append_table_lineage VALUES (5, 'EEE'), (6,
'FFF')",
+ "INSERT INTO append_table_tracking VALUES (5, 'EEE'), (6,
'FFF')",
4L,
Snapshot.CommitKind.APPEND,
- "append_table_lineage");
+ "append_table_tracking");
assertExecuteExpected(
- "INSERT INTO append_table_lineage VALUES (7, 'HHH'), (8,
'III')",
+ "INSERT INTO append_table_tracking VALUES (7, 'HHH'), (8,
'III')",
5L,
Snapshot.CommitKind.APPEND,
- "append_table_lineage");
+ "append_table_tracking");
assertExecuteExpected(
- "INSERT INTO append_table_lineage VALUES (9, 'JJJ'), (10,
'KKK')",
+ "INSERT INTO append_table_tracking VALUES (9, 'JJJ'), (10,
'KKK')",
6L,
Snapshot.CommitKind.APPEND,
- "append_table_lineage");
+ "append_table_tracking");
assertExecuteExpected(
- "INSERT INTO append_table_lineage VALUES (11, 'LLL'), (12,
'MMM')",
+ "INSERT INTO append_table_tracking VALUES (11, 'LLL'), (12,
'MMM')",
7L,
Snapshot.CommitKind.APPEND,
- "append_table_lineage");
+ "append_table_tracking");
assertExecuteExpected(
- "INSERT INTO append_table_lineage VALUES (13, 'NNN'), (14,
'OOO')",
+ "INSERT INTO append_table_tracking VALUES (13, 'NNN'), (14,
'OOO')",
8L,
Snapshot.CommitKind.APPEND,
- "append_table_lineage");
+ "append_table_tracking");
- List<Row> originRowsWithId2 = batchSql("SELECT * FROM
append_table_lineage$row_lineage");
- batchSql("call sys.compact('default.append_table_lineage')");
- waitCompactSnapshot(60000L, "append_table_lineage");
- List<Row> files = batchSql("SELECT * FROM append_table_lineage$files");
+ List<Row> originRowsWithId2 = batchSql("SELECT * FROM
append_table_tracking$row_tracking");
+ batchSql("call sys.compact('default.append_table_tracking')");
+ waitCompactSnapshot(60000L, "append_table_tracking");
+ List<Row> files = batchSql("SELECT * FROM
append_table_tracking$files");
assertThat(files.size()).isEqualTo(1);
- List<Row> rowsAfter2 = batchSql("SELECT * FROM
append_table_lineage$row_lineage");
+ List<Row> rowsAfter2 = batchSql("SELECT * FROM
append_table_tracking$row_tracking");
assertThat(originRowsWithId2).containsExactlyInAnyOrderElementsOf(rowsAfter2);
assertThat(rowsAfter2)
@@ -655,7 +655,7 @@ public class AppendTableITCase extends CatalogITCaseBase {
protected List<String> ddl() {
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING)
WITH ('bucket' = '-1')",
- "CREATE TABLE IF NOT EXISTS append_table_lineage (id INT, data
STRING) WITH ('bucket' = '-1', 'row-tracking.enabled' = 'true')",
+ "CREATE TABLE IF NOT EXISTS append_table_tracking (id INT,
data STRING) WITH ('bucket' = '-1', 'row-tracking.enabled' = 'true')",
"CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING,
dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')",
"CREATE TABLE IF NOT EXISTS complex_table (id INT, data
MAP<INT, INT>) WITH ('bucket' = '-1')",
"CREATE TABLE IF NOT EXISTS index_table (id INT, indexc
STRING, data STRING) WITH ('bucket' = '-1',
'file-index.bloom-filter.columns'='indexc',
'file-index.bloom-filter.indexc.items' = '500')");
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
copy from
paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
copy to
paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
---
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
rename from
paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
rename to
paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
---
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
rename from
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
rename to
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
rename from
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
rename to
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
deleted file mode 100644
index f7d911c31c..0000000000
---
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.sql
-
-class RowLineageTest extends RowLineageTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
similarity index 94%
rename from
paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
rename to
paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index f7d911c31c..9f96840a77 100644
---
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class RowLineageTest extends RowLineageTestBase {}
+class RowTrackingTest extends RowTrackingTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index dd36e2f570..3a33fe008b 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -424,7 +424,7 @@ public class CompactProcedure extends BaseProcedure {
CoreOptions coreOptions =
table.coreOptions();
if
(coreOptions.rowTrackingEnabled()) {
write.withWriteType(
-
SpecialFields.rowTypeWithRowLineage(
+
SpecialFields.rowTypeWithRowTracking(
table.rowType()));
}
AppendCompactTaskSerializer ser =
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index ce1fe2e25f..db28b4f4ef 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -40,7 +40,7 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
lazy val tableRowType: RowType = {
val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
if (coreOptions.rowTrackingEnabled()) {
- SpecialFields.rowTypeWithRowLineage(table.rowType())
+ SpecialFields.rowTypeWithRowTracking(table.rowType())
} else {
table.rowType()
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
index 1ac9aab47b..704a6c2d5b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -33,7 +33,7 @@ case class SparkTableWrite(
writeBuilder: BatchWriteBuilder,
writeType: RowType,
rowKindColIdx: Int = -1,
- writeRowLineage: Boolean = false)
+ writeRowTracking: Boolean = false)
extends SparkTableWriteTrait {
private val ioManager: IOManager = SparkUtils.createIOManager
@@ -41,7 +41,7 @@ case class SparkTableWrite(
private val write: BatchTableWrite = {
val _write = writeBuilder.newWrite()
_write.withIOManager(ioManager)
- if (writeRowLineage) {
+ if (writeRowTracking) {
_write.withWriteType(writeType)
}
_write
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 6080cd0378..c85315d876 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -147,11 +147,11 @@ case class DeleteFromPaimonTableCommand(
val toRewriteScanRelation = Filter(Not(condition), newRelation)
var data = createDataset(sparkSession, toRewriteScanRelation)
if (coreOptions.rowTrackingEnabled()) {
- data = selectWithRowLineage(data)
+ data = selectWithRowTracking(data)
}
// only write new files, should have no compaction
- val addCommitMessage = writer.writeOnly().withRowLineage().write(data)
+ val addCommitMessage = writer.writeOnly().withRowTracking().write(data)
// Step5: convert the deleted files that need to be written to commit
message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 3e8d4deabe..5f12d1110d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -164,32 +164,32 @@ case class MergeIntoPaimonTable(
val (_, filesToReadScan) =
extractFilesAndCreateNewScan(filePathsToRead.toArray,
dataFilePathToMeta, relation)
- // If no files need to be rewritten, no need to write row lineage
- val writeRowLineage = coreOptions.rowTrackingEnabled() &&
filesToRewritten.nonEmpty
+ // If no files need to be rewritten, no need to write row tracking
+ val writeRowTracking = coreOptions.rowTrackingEnabled() &&
filesToRewritten.nonEmpty
// Add FILE_TOUCHED_COL to mark the row as coming from the touched file,
if the row has not been
// modified and was from touched file, it should be kept too.
var filesToRewrittenDS =
createDataset(sparkSession,
filesToRewrittenScan).withColumn(FILE_TOUCHED_COL, lit(true))
- if (writeRowLineage) {
- filesToRewrittenDS = selectWithRowLineage(filesToRewrittenDS)
+ if (writeRowTracking) {
+ filesToRewrittenDS = selectWithRowTracking(filesToRewrittenDS)
}
var filesToReadDS =
createDataset(sparkSession,
filesToReadScan).withColumn(FILE_TOUCHED_COL, lit(false))
- if (writeRowLineage) {
- // For filesToReadScan we don't need to read row lineage meta cols,
just add placeholders
- ROW_LINEAGE_META_COLUMNS.foreach(
+ if (writeRowTracking) {
+ // For filesToReadScan we don't need to read row tracking meta cols,
just add placeholders
+ ROW_TRACKING_META_COLUMNS.foreach(
c => filesToReadDS = filesToReadDS.withColumn(c, lit(null)))
}
val toWriteDS = constructChangedRows(
sparkSession,
filesToRewrittenDS.union(filesToReadDS),
- writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
+ writeRowTracking = writeRowTracking).drop(ROW_KIND_COL)
- val finalWriter = if (writeRowLineage) {
- writer.withRowLineage()
+ val finalWriter = if (writeRowTracking) {
+ writer.withRowTracking()
} else {
writer
}
@@ -207,7 +207,7 @@ case class MergeIntoPaimonTable(
remainDeletedRow: Boolean = false,
deletionVectorEnabled: Boolean = false,
extraMetadataCols: Seq[PaimonMetadataColumn] = Seq.empty,
- writeRowLineage: Boolean = false): Dataset[Row] = {
+ writeRowTracking: Boolean = false): Dataset[Row] = {
val targetDS = targetDataset
.withColumn(TARGET_ROW_COL, lit(true))
@@ -233,7 +233,7 @@ case class MergeIntoPaimonTable(
def attribute(name: String) = joinedPlan.output.find(attr =>
resolver(name, attr.name))
val extraMetadataAttributes =
extraMetadataCols.flatMap(metadataCol => attribute(metadataCol.name))
- val (rowIdAttr, sequenceNumberAttr) = if (writeRowLineage) {
+ val (rowIdAttr, sequenceNumberAttr) = if (writeRowTracking) {
(
attribute(SpecialFields.ROW_ID.name()).get,
attribute(SpecialFields.SEQUENCE_NUMBER.name()).get)
@@ -241,7 +241,7 @@ case class MergeIntoPaimonTable(
(null, null)
}
- val targetOutput = if (writeRowLineage) {
+ val targetOutput = if (writeRowTracking) {
filteredTargetPlan.output ++ Seq(rowIdAttr, sequenceNumberAttr)
} else {
filteredTargetPlan.output
@@ -253,7 +253,7 @@ case class MergeIntoPaimonTable(
val columnExprs = actions.map {
case UpdateAction(_, assignments) =>
var exprs = assignments.map(_.value)
- if (writeRowLineage) {
+ if (writeRowTracking) {
exprs ++= Seq(rowIdAttr, Literal(null))
}
exprs :+ Literal(RowKind.UPDATE_AFTER.toByteValue)
@@ -267,7 +267,7 @@ case class MergeIntoPaimonTable(
}
case InsertAction(_, assignments) =>
var exprs = assignments.map(_.value)
- if (writeRowLineage) {
+ if (writeRowTracking) {
exprs ++= Seq(rowIdAttr, sequenceNumberAttr)
}
exprs :+ Literal(RowKind.INSERT.toByteValue)
@@ -280,7 +280,7 @@ case class MergeIntoPaimonTable(
val notMatchedBySourceOutputs =
processMergeActions(notMatchedBySourceActions)
val notMatchedOutputs = processMergeActions(notMatchedActions)
val outputFields = mutable.ArrayBuffer(targetTable.schema.fields: _*)
- if (writeRowLineage) {
+ if (writeRowTracking) {
outputFields += PaimonMetadataColumn.ROW_ID.toStructField
outputFields += PaimonMetadataColumn.SEQUENCE_NUMBER.toStructField
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index a440ad353a..344cd0879b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -51,7 +51,7 @@ import java.util.Collections.singletonMap
import scala.collection.JavaConverters._
-case class PaimonSparkWriter(table: FileStoreTable, writeRowLineage: Boolean =
false)
+case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean
= false)
extends WriteHelper {
private lazy val tableSchema = table.schema
@@ -61,8 +61,8 @@ case class PaimonSparkWriter(table: FileStoreTable,
writeRowLineage: Boolean = f
@transient private lazy val serializer = new CommitMessageSerializer
private val writeType = {
- if (writeRowLineage) {
- SpecialFields.rowTypeWithRowLineage(table.rowType(), true)
+ if (writeRowTracking) {
+ SpecialFields.rowTypeWithRowTracking(table.rowType(), true)
} else {
table.rowType()
}
@@ -74,9 +74,9 @@ case class PaimonSparkWriter(table: FileStoreTable,
writeRowLineage: Boolean = f
PaimonSparkWriter(table.copy(singletonMap(WRITE_ONLY.key(), "true")))
}
- def withRowLineage(): PaimonSparkWriter = {
+ def withRowTracking(): PaimonSparkWriter = {
if (coreOptions.rowTrackingEnabled()) {
- PaimonSparkWriter(table, writeRowLineage = true)
+ PaimonSparkWriter(table, writeRowTracking = true)
} else {
this
}
@@ -98,7 +98,7 @@ case class PaimonSparkWriter(table: FileStoreTable,
writeRowLineage: Boolean = f
val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema,
BUCKET_COL)
val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
- def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx,
writeRowLineage)
+ def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx,
writeRowTracking)
def sparkParallelism = {
val defaultParallelism = sparkSession.sparkContext.defaultParallelism
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index e304480bae..1babd7b5c3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -161,7 +161,7 @@ case class UpdatePaimonTableCommand(
}
val data = createDataset(sparkSession,
toUpdateScanRelation).select(updateColumns: _*)
- writer.withRowLineage().write(data)
+ writer.withRowTracking().write(data)
}
private def optimizedIf(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 0acb25e58b..fa71c7530f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -53,7 +53,7 @@ object PaimonMetadataColumn {
val SEQUENCE_NUMBER_COLUMN: String = SpecialFields.SEQUENCE_NUMBER.name()
val DV_META_COLUMNS: Seq[String] = Seq(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
- val ROW_LINEAGE_META_COLUMNS: Seq[String] = Seq(ROW_ID_COLUMN,
SEQUENCE_NUMBER_COLUMN)
+ val ROW_TRACKING_META_COLUMNS: Seq[String] = Seq(ROW_ID_COLUMN,
SEQUENCE_NUMBER_COLUMN)
val SUPPORTED_METADATA_COLUMNS: Seq[String] = Seq(
ROW_INDEX_COLUMN,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
index 8b2964cd55..1799df48fe 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.util
import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.schema.PaimonMetadataColumn.{DV_META_COLUMNS,
ROW_LINEAGE_META_COLUMNS}
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{DV_META_COLUMNS,
ROW_TRACKING_META_COLUMNS}
import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
import org.apache.paimon.table.source.DataSplit
@@ -60,8 +60,8 @@ trait ScanPlanHelper extends SQLConfHelper {
selectWithAdditionalCols(data, DV_META_COLUMNS)
}
- def selectWithRowLineage(data: DataFrame): DataFrame = {
- selectWithAdditionalCols(data, ROW_LINEAGE_META_COLUMNS)
+ def selectWithRowTracking(data: DataFrame): DataFrame = {
+ selectWithAdditionalCols(data, ROW_TRACKING_META_COLUMNS)
}
private def selectWithAdditionalCols(data: DataFrame, additionalCols:
Seq[String]): DataFrame = {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
similarity index 95%
rename from
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
rename to
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index d8a3516a80..c8a54d3342 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -23,9 +23,9 @@ import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
-abstract class RowLineageTestBase extends PaimonSparkTestBase {
+abstract class RowTrackingTestBase extends PaimonSparkTestBase {
- test("Row Lineage: read row lineage") {
+ test("Row Tracking: read row Tracking") {
withTable("t") {
sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
@@ -41,7 +41,7 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase
{
}
}
- test("Row Lineage: compact table") {
+ test("Row Tracking: compact table") {
withTable("t") {
sql(
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'compaction.min.file-num'='2')")
@@ -64,7 +64,7 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase
{
}
}
- test("Row Lineage: delete table") {
+ test("Row Tracking: delete table") {
withTable("t") {
sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
@@ -77,7 +77,7 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase
{
}
}
- test("Row Lineage: update table") {
+ test("Row Tracking: update table") {
withTable("t") {
sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
@@ -95,7 +95,7 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase
{
}
}
- test("Row Lineage: update table without condition") {
+ test("Row Tracking: update table without condition") {
withTable("t") {
sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
@@ -109,7 +109,7 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
}
}
- test("Row Lineage: merge into table") {
+ test("Row Tracking: merge into table") {
withTable("s", "t") {
sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
sql("INSERT INTO s VALUES (1, 11), (2, 22)")
@@ -135,7 +135,7 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
}
}
- test("Row Lineage: merge into table with only insert") {
+ test("Row Tracking: merge into table with only insert") {
withTable("s", "t") {
sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
sql("INSERT INTO s VALUES (1, 11), (2, 22)")
@@ -156,7 +156,7 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
}
}
- test("Row Lineage: merge into table with only delete") {
+ test("Row Tracking: merge into table with only delete") {
withTable("s", "t") {
sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
sql("INSERT INTO s VALUES (1, 11), (2, 22)")
@@ -177,7 +177,7 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
}
}
- test("Row Lineage: merge into table with only update") {
+ test("Row Tracking: merge into table with only update") {
withTable("s", "t") {
sql("CREATE TABLE s (id INT, b INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
sql("INSERT INTO s VALUES (1, 11), (2, 22)")
@@ -342,7 +342,7 @@ abstract class RowLineageTestBase extends
PaimonSparkTestBase {
}
}
- test("Row Lineage: merge into table not matched by source") {
+ test("Row Tracking: merge into table not matched by source") {
if (gteqSpark3_4) {
withTable("source", "target") {
sql(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
index dcbf12e273..df65a50095 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.Row
class ScanPlanHelperTest extends PaimonSparkTestBase with ScanPlanHelper {
- test("ScanPlanHelper: create new scan plan and select with row lineage meta
cols") {
+ test("ScanPlanHelper: create new scan plan and select with row tracking meta
cols") {
withTable("t") {
sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
@@ -38,17 +38,17 @@ class ScanPlanHelperTest extends PaimonSparkTestBase with
ScanPlanHelper {
// select original df should not contain meta cols
checkAnswer(newDf, Seq(Row(11, "a"), Row(22, "b")))
- // select df with row lineage meta cols
- checkAnswer(selectWithRowLineage(newDf), Seq(Row(11, "a", 0, 1), Row(22,
"b", 1, 1)))
+ // select df with row tracking meta cols
+ checkAnswer(selectWithRowTracking(newDf), Seq(Row(11, "a", 0, 1),
Row(22, "b", 1, 1)))
- // select with row lineage meta cols twice should not add new more meta
cols
+ // select with row tracking meta cols twice should not add new more meta
cols
checkAnswer(
- selectWithRowLineage(selectWithRowLineage(newDf)),
+ selectWithRowTracking(selectWithRowTracking(newDf)),
Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1)))
- // select df already contains meta cols with row lineage
+ // select df already contains meta cols with row tracking
checkAnswer(
- selectWithRowLineage(newDf.select("_ROW_ID", "id")),
+ selectWithRowTracking(newDf.select("_ROW_ID", "id")),
Seq(Row(0, 11, 1), Row(1, 22, 1)))
}
}