This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9de4a149a2 [core] Row-tracking row should keep their row_id and
sequence_number in compaction (#5991)
9de4a149a2 is described below
commit 9de4a149a29b17fcb5eeabffe98eeffb0b5415c1
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jul 31 13:31:37 2025 +0800
[core] Row-tracking row should keep their row_id and sequence_number in
compaction (#5991)
---
.../org/apache/paimon/table/SpecialFields.java | 20 ++++++
.../org/apache/paimon/AppendOnlyFileStore.java | 7 +-
.../paimon/operation/BaseAppendFileStoreWrite.java | 33 ++++++----
.../paimon/operation/FileStoreCommitImpl.java | 3 +
.../paimon/table/system/RowLineageTable.java | 7 +-
.../org/apache/paimon/flink/AppendTableITCase.java | 75 ++++++++++++++++++++++
6 files changed, 127 insertions(+), 18 deletions(-)
diff --git
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index 1a594e5d15..68e1fc86b6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -20,8 +20,11 @@ package org.apache.paimon.table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -135,4 +138,21 @@ public class SpecialFields {
+ mapFieldId * STRUCTURED_TYPE_FIELD_DEPTH_LIMIT
+ depth;
}
+
+ public static RowType rowTypeWithRowLineage(RowType rowType) {
+ List<DataField> fieldsWithRowLineage = new
ArrayList<>(rowType.getFields());
+
+ fieldsWithRowLineage.forEach(
+ f -> {
+ if (ROW_ID.name().equals(f.name()) ||
SEQUENCE_NUMBER.name().equals(f.name())) {
+ throw new IllegalArgumentException(
+ "Row lineage field name '"
+ + f.name()
+ + "' conflicts with existing field
names.");
+ }
+ });
+ fieldsWithRowLineage.add(SpecialFields.ROW_ID);
+ fieldsWithRowLineage.add(SpecialFields.SEQUENCE_NUMBER);
+ return new RowType(fieldsWithRowLineage);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index c81a44d426..e25ec5319d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -33,6 +33,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -96,9 +97,13 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
?
DeletionVectorsMaintainer.factory(newIndexFileHandler())
: null;
if (bucketMode() == BucketMode.BUCKET_UNAWARE) {
+ RawFileSplitRead readForCompact = newRead();
+ if (options.rowTrackingEnabled()) {
+
readForCompact.withReadType(SpecialFields.rowTypeWithRowLineage(rowType));
+ }
return new AppendFileStoreWrite(
fileIO,
- newRead(),
+ readForCompact,
schema.id(),
rowType,
partitionType,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 27e1e21b90..d7b0943188 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -35,6 +35,7 @@ import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExceptionUtils;
@@ -43,7 +44,6 @@ import org.apache.paimon.utils.IOExceptionSupplier;
import org.apache.paimon.utils.LongCounter;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.StatsCollectorFactories;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +59,7 @@ import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import static org.apache.paimon.format.FileFormat.fileFormat;
+import static
org.apache.paimon.utils.StatsCollectorFactories.createStatsFactories;
/** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
@@ -67,7 +68,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
private static final Logger LOG =
LoggerFactory.getLogger(BaseAppendFileStoreWrite.class);
private final FileIO fileIO;
- private final RawFileSplitRead read;
+ private final RawFileSplitRead readForCompact;
private final long schemaId;
private final RowType rowType;
private final FileFormat fileFormat;
@@ -79,7 +80,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
public BaseAppendFileStoreWrite(
FileIO fileIO,
- RawFileSplitRead read,
+ RawFileSplitRead readForCompact,
long schemaId,
RowType rowType,
RowType partitionType,
@@ -91,15 +92,14 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
String tableName) {
super(snapshotManager, scan, options, partitionType, null,
dvMaintainerFactory, tableName);
this.fileIO = fileIO;
- this.read = read;
+ this.readForCompact = readForCompact;
this.schemaId = schemaId;
this.rowType = rowType;
this.fileFormat = fileFormat(options);
this.pathFactory = pathFactory;
this.statsCollectors =
- StatsCollectorFactories.createStatsFactories(
- options.statsMode(), options, rowType.getFieldNames());
+ createStatsFactories(options.statsMode(), options,
rowType.getFieldNames());
this.fileIndexOptions = options.indexColumnsOptions();
}
@@ -153,10 +153,17 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
if (toCompact.isEmpty()) {
return Collections.emptyList();
}
+ RowType writeSchema =
+ options.rowTrackingEnabled()
+ ? SpecialFields.rowTypeWithRowLineage(rowType)
+ : rowType;
Exception collectedExceptions = null;
RowDataRollingFileWriter rewriter =
createRollingFileWriter(
- partition, bucket, new
LongCounter(toCompact.get(0).minSequenceNumber()));
+ partition,
+ bucket,
+ new LongCounter(toCompact.get(0).minSequenceNumber()),
+ writeSchema);
List<IOExceptionSupplier<DeletionVector>> dvFactories = null;
if (dvFactory != null) {
dvFactories = new ArrayList<>(toCompact.size());
@@ -182,17 +189,20 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
}
private RowDataRollingFileWriter createRollingFileWriter(
- BinaryRow partition, int bucket, LongCounter seqNumCounter) {
+ BinaryRow partition, int bucket, LongCounter seqNumCounter,
RowType writeSchema) {
return new RowDataRollingFileWriter(
fileIO,
schemaId,
fileFormat,
options.targetFileSize(false),
- rowType,
+ writeSchema,
pathFactory.createDataFilePathFactory(partition, bucket),
seqNumCounter,
options.fileCompression(),
- statsCollectors,
+ writeSchema.equals(rowType)
+ ? statsCollectors
+ : createStatsFactories(
+ options.statsMode(), options,
writeSchema.getFieldNames()),
fileIndexOptions,
FileSource.COMPACT,
options.asyncFileWrite(),
@@ -205,7 +215,8 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
List<DataFileMeta> files,
@Nullable List<IOExceptionSupplier<DeletionVector>> dvFactories)
throws IOException {
- return new RecordReaderIterator<>(read.createReader(partition, bucket,
files, dvFactories));
+ return new RecordReaderIterator<>(
+ readForCompact.createReader(partition, bucket, files,
dvFactories));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 132c88d694..54888abe72 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1183,6 +1183,9 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
long rowCount = entry.file().rowCount();
rowIdAssigned.add(entry.assignFirstRowId(start));
start += rowCount;
+ } else {
+ // for compact file, do not assign first row id.
+ rowIdAssigned.add(entry);
}
}
return start;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
index c428df607f..6be13e8e7e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
@@ -36,7 +36,6 @@ import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.ChangelogManager;
@@ -44,7 +43,6 @@ import org.apache.paimon.utils.SimpleFileReader;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -103,10 +101,7 @@ public class RowLineageTable implements DataTable,
ReadonlyTable {
@Override
public RowType rowType() {
- List<DataField> fields = new
ArrayList<>(wrapped.rowType().getFields());
- fields.add(SpecialFields.ROW_ID);
- fields.add(SpecialFields.SEQUENCE_NUMBER);
- return new RowType(fields);
+ return SpecialFields.rowTypeWithRowLineage(wrapped.rowType());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
index 4afe48a4a9..75c55c8f88 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
@@ -107,6 +107,81 @@ public class AppendTableITCase extends CatalogITCaseBase {
assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2,
"BBB"));
}
+ @Test
+ public void testCompactionWithRowLineage() throws Exception {
+ batchSql("ALTER TABLE append_table_lineage SET
('compaction.max.file-num' = '4')");
+
+ assertExecuteExpected(
+ "INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2,
'BBB')",
+ 1L,
+ Snapshot.CommitKind.APPEND,
+ "append_table_lineage");
+ assertExecuteExpected(
+ "INSERT INTO append_table_lineage VALUES (3, 'CCC'), (4,
'DDD')",
+ 2L,
+ Snapshot.CommitKind.APPEND,
+ "append_table_lineage");
+ assertExecuteExpected(
+ "INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2,
'BBB'), (3, 'CCC'), (4, 'DDD')",
+ 3L,
+ Snapshot.CommitKind.APPEND,
+ "append_table_lineage");
+ assertExecuteExpected(
+ "INSERT INTO append_table_lineage VALUES (5, 'EEE'), (6,
'FFF')",
+ 4L,
+ Snapshot.CommitKind.APPEND,
+ "append_table_lineage");
+ assertExecuteExpected(
+ "INSERT INTO append_table_lineage VALUES (7, 'HHH'), (8,
'III')",
+ 5L,
+ Snapshot.CommitKind.APPEND,
+ "append_table_lineage");
+ assertExecuteExpected(
+ "INSERT INTO append_table_lineage VALUES (9, 'JJJ'), (10,
'KKK')",
+ 6L,
+ Snapshot.CommitKind.APPEND,
+ "append_table_lineage");
+ assertExecuteExpected(
+ "INSERT INTO append_table_lineage VALUES (11, 'LLL'), (12,
'MMM')",
+ 7L,
+ Snapshot.CommitKind.APPEND,
+ "append_table_lineage");
+ assertExecuteExpected(
+ "INSERT INTO append_table_lineage VALUES (13, 'NNN'), (14,
'OOO')",
+ 8L,
+ Snapshot.CommitKind.APPEND,
+ "append_table_lineage");
+
+ List<Row> originRowsWithId2 = batchSql("SELECT * FROM
append_table_lineage$row_lineage");
+ batchSql("call sys.compact('default.append_table_lineage')");
+ waitCompactSnapshot(60000L, "append_table_lineage");
+ List<Row> files = batchSql("SELECT * FROM append_table_lineage$files");
+ assertThat(files.size()).isEqualTo(1);
+ List<Row> rowsAfter2 = batchSql("SELECT * FROM
append_table_lineage$row_lineage");
+
assertThat(originRowsWithId2).containsExactlyInAnyOrderElementsOf(rowsAfter2);
+
+ assertThat(rowsAfter2)
+ .containsExactlyInAnyOrder(
+ Row.of(1, "AAA", 0L, 1L),
+ Row.of(2, "BBB", 1L, 1L),
+ Row.of(3, "CCC", 2L, 2L),
+ Row.of(4, "DDD", 3L, 2L),
+ Row.of(1, "AAA", 4L, 3L),
+ Row.of(2, "BBB", 5L, 3L),
+ Row.of(3, "CCC", 6L, 3L),
+ Row.of(4, "DDD", 7L, 3L),
+ Row.of(5, "EEE", 8L, 4L),
+ Row.of(6, "FFF", 9L, 4L),
+ Row.of(7, "HHH", 10L, 5L),
+ Row.of(8, "III", 11L, 5L),
+ Row.of(9, "JJJ", 12L, 6L),
+ Row.of(10, "KKK", 13L, 6L),
+ Row.of(11, "LLL", 14L, 7L),
+ Row.of(12, "MMM", 15L, 7L),
+ Row.of(13, "NNN", 16L, 8L),
+ Row.of(14, "OOO", 17L, 8L));
+ }
+
@Test
public void testSkipDedup() {
batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (1, 'AAA'), (2,
'BBB'), (3, 'BBB')");