This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 058cf13a6 [lake/paimon] Support tiering paimon deletion vector enabled
table (#1725)
058cf13a6 is described below
commit 058cf13a61bb01adbdfb81051c7b1efa75edfe47
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Sep 22 15:28:11 2025 +0800
[lake/paimon] Support tiering paimon deletion vector enabled table (#1725)
---
.../flink/tiering/source/TieringSplitReader.java | 2 +-
.../fluss/lake/paimon/tiering/RecordWriter.java | 5 +-
.../paimon/tiering/mergetree/MergeTreeWriter.java | 17 +-
.../testutils/FlinkPaimonTieringTestBase.java | 29 ++-
.../lake/paimon/tiering/PaimonTieringITCase.java | 195 ++++++++++++---------
.../tiering/ReCreateSameTableAfterTieringTest.java | 4 +-
6 files changed, 158 insertions(+), 94 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
index bcee1861d..673b723d8 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
@@ -234,7 +234,7 @@ public class TieringSplitReader<WriteResult>
// instead of fail directly
checkArgument(
currentTableInfo.getTableId() ==
split.getTableBucket().getTableId(),
- "The current table id %s for table path % is different
from the table id %s in TieringSplit split.",
+ "The current table id %s for table path %s is different
from the table id %s in TieringSplit split.",
currentTableInfo.getTableId(),
tablePath,
split.getTableBucket().getTableId());
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
index 209d356ab..f835b0486 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
@@ -59,7 +59,10 @@ public abstract class RecordWriter<T> implements
AutoCloseable {
CommitMessage complete() throws Exception {
List<CommitMessage> commitMessages = tableWrite.prepareCommit();
- checkState(commitMessages.size() == 1, "The size of CommitMessage must
be 1.");
+ checkState(
+ commitMessages.size() == 1,
+ "The size of CommitMessage must be 1, but got %s.",
+ commitMessages);
return commitMessages.get(0);
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
index 4a32392bf..c27ebef6d 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
@@ -22,6 +22,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecord;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableWriteImpl;
@@ -29,6 +30,7 @@ import org.apache.paimon.table.sink.TableWriteImpl;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Map;
import static
org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
@@ -36,6 +38,9 @@ import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
/** A {@link RecordWriter} to write to Paimon's primary-key table. */
public class MergeTreeWriter extends RecordWriter<KeyValue> {
+ // the option key to configure the temporary directory used by fluss
tiering
+ private static final String FLUSS_TIERING_TMP_DIR_KEY =
"fluss.tiering.io-tmpdir";
+
private final KeyValue keyValue = new KeyValue();
private final RowKeyExtractor rowKeyExtractor;
@@ -55,8 +60,18 @@ public class MergeTreeWriter extends RecordWriter<KeyValue> {
}
private static TableWriteImpl<KeyValue> createTableWrite(FileStoreTable
fileStoreTable) {
+ // we allow users to configure the temporary directory used by fluss
tiering
+ // since the default java.io.tmpdir may not be suitable.
+ // currently, we don't expose the option, as a workaround way, maybe
in the future we can
+ // expose it if it's needed
+ Map<String, String> props = fileStoreTable.options();
+ String tmpDir =
+ props.getOrDefault(FLUSS_TIERING_TMP_DIR_KEY,
System.getProperty("java.io.tmpdir"));
//noinspection unchecked
- return (TableWriteImpl<KeyValue>)
fileStoreTable.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER);
+ return (TableWriteImpl<KeyValue>)
+ fileStoreTable
+ .newWrite(FLUSS_LAKE_TIERING_COMMIT_USER)
+ .withIOManager(IOManager.create(tmpDir));
}
@Override
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 7405841b5..995b2db46 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -59,6 +59,7 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -329,8 +330,25 @@ public abstract class FlinkPaimonTieringTestBase {
return createPkTable(tablePath, 1);
}
+ protected long createPkTable(
+ TablePath tablePath,
+ Map<String, String> tableProperties,
+ Map<String, String> tableCustomProperties)
+ throws Exception {
+ return createPkTable(tablePath, 1, tableProperties,
tableCustomProperties);
+ }
+
protected long createPkTable(TablePath tablePath, int bucketNum) throws
Exception {
- TableDescriptor table1Descriptor =
+ return createPkTable(tablePath, bucketNum, Collections.emptyMap(),
Collections.emptyMap());
+ }
+
+ protected long createPkTable(
+ TablePath tablePath,
+ int bucketNum,
+ Map<String, String> tableProperties,
+ Map<String, String> tableCustomProperties)
+ throws Exception {
+ TableDescriptor.Builder tableDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
@@ -340,9 +358,10 @@ public abstract class FlinkPaimonTieringTestBase {
.build())
.distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
- .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500))
- .build();
- return createTable(tablePath, table1Descriptor);
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ tableDescriptor.customProperties(tableCustomProperties);
+ tableDescriptor.properties(tableProperties);
+ return createTable(tablePath, tableDescriptor.build());
}
protected void dropTable(TablePath tablePath) throws Exception {
@@ -422,7 +441,7 @@ public abstract class FlinkPaimonTieringTestBase {
"bucket " + tb + "not synced");
}
- protected void checkDataInPaimonPrimayKeyTable(
+ protected void checkDataInPaimonPrimaryKeyTable(
TablePath tablePath, List<InternalRow> expectedRows) throws
Exception {
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
getPaimonRowCloseableIterator(tablePath);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 301427186..442962f31 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -92,97 +92,124 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);
- // check the status of replica after synced
- assertReplicaStatus(t1Bucket, 3);
- // check data in paimon
- checkDataInPaimonPrimayKeyTable(t1, rows);
- // check snapshot property in paimon
- Map<String, String> properties =
- new HashMap<String, String>() {
- {
- put(
- FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
- "[{\"bucket_id\":0,\"log_offset\":3}]");
- }
- };
- checkSnapshotPropertyInPaimon(t1, properties);
-
- // then, create another log table
- TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
- long t2Id = createLogTable(t2);
- TableBucket t2Bucket = new TableBucket(t2Id, 0);
- List<InternalRow> flussRows = new ArrayList<>();
- // write records
- for (int i = 0; i < 10; i++) {
- rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
- flussRows.addAll(rows);
+ try {
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 3);
+ // check data in paimon
+ checkDataInPaimonPrimaryKeyTable(t1, rows);
+ // check snapshot property in paimon
+ Map<String, String> properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ "[{\"bucket_id\":0,\"log_offset\":3}]");
+ }
+ };
+ checkSnapshotPropertyInPaimon(t1, properties);
+
+ // then, create another log table
+ TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
+ long t2Id = createLogTable(t2);
+ TableBucket t2Bucket = new TableBucket(t2Id, 0);
+ List<InternalRow> flussRows = new ArrayList<>();
+ // write records
+ for (int i = 0; i < 10; i++) {
+ rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
+ flussRows.addAll(rows);
+ // write records
+ writeRows(t2, rows, true);
+ }
+ // check the status of replica after synced;
+ // note: we can't update log start offset for unaware bucket mode
log table
+ assertReplicaStatus(t2Bucket, 30);
+
+ // check data in paimon
+ checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
+
+ // then write data to the pk tables
// write records
- writeRows(t2, rows, true);
+ rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3,
"v333"));
+ // write records
+ writeRows(t1, rows, false);
+
+ // check the status of replica of t2 after synced
+ // not check start offset since we won't
+ // update start log offset for primary key table
+ assertReplicaStatus(t1Bucket, 9);
+
+ checkDataInPaimonPrimaryKeyTable(t1, rows);
+
+ // then create partitioned table and wait partitions are ready
+ TablePath partitionedTablePath = TablePath.of(DEFAULT_DB,
"partitionedTable");
+ Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
+ createPartitionedTable(partitionedTablePath);
+ Map<Long, String> partitionNameByIds =
waitUntilPartitions(partitionedTablePath);
+
+ // now, write rows into partitioned table
+ TableDescriptor partitionedTableDescriptor =
tableIdAndDescriptor.f1;
+ Map<String, List<InternalRow>> writtenRowsByPartition =
+ writeRowsIntoPartitionedTable(
+ partitionedTablePath, partitionedTableDescriptor,
partitionNameByIds);
+ long tableId = tableIdAndDescriptor.f0;
+
+ // wait until synced to paimon
+ for (Long partitionId : partitionNameByIds.keySet()) {
+ TableBucket tableBucket = new TableBucket(tableId,
partitionId, 0);
+ assertReplicaStatus(tableBucket, 3);
+ }
+
+ // now, let's check data in paimon per partition
+ // check data in paimon
+ String partitionCol =
partitionedTableDescriptor.getPartitionKeys().get(0);
+ for (String partitionName : partitionNameByIds.values()) {
+ checkDataInPaimonAppendOnlyPartitionedTable(
+ partitionedTablePath,
+ Collections.singletonMap(partitionCol, partitionName),
+ writtenRowsByPartition.get(partitionName),
+ 0);
+ }
+
+ properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ "["
+ +
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
+ +
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
+ + "]");
+ }
+ };
+ checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
+ } finally {
+ jobClient.cancel().get();
}
- // check the status of replica after synced;
- // note: we can't update log start offset for unaware bucket mode log
table
- assertReplicaStatus(t2Bucket, 30);
-
- // check data in paimon
- checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
+ }
- // then write data to the pk tables
- // write records
- rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333"));
+ @Test
+ void testTieringToDvEnabledTable() throws Exception {
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
+ long t1Id =
+ createPkTable(
+ t1,
+
Collections.singletonMap("table.datalake.auto-compaction", "true"),
+
Collections.singletonMap("paimon.deletion-vectors.enabled", "true"));
// write records
+ List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
writeRows(t1, rows, false);
+ waitUntilSnapshot(t1Id, 1, 0);
- // check the status of replica of t2 after synced
- // not check start offset since we won't
- // update start log offset for primary key table
- assertReplicaStatus(t1Bucket, 9);
-
- checkDataInPaimonPrimayKeyTable(t1, rows);
-
- // then create partitioned table and wait partitions are ready
- TablePath partitionedTablePath = TablePath.of(DEFAULT_DB,
"partitionedTable");
- Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
- createPartitionedTable(partitionedTablePath);
- Map<Long, String> partitionNameByIds =
waitUntilPartitions(partitionedTablePath);
-
- // now, write rows into partitioned table
- TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
- Map<String, List<InternalRow>> writtenRowsByPartition =
- writeRowsIntoPartitionedTable(
- partitionedTablePath, partitionedTableDescriptor,
partitionNameByIds);
- long tableId = tableIdAndDescriptor.f0;
-
- // wait until synced to paimon
- for (Long partitionId : partitionNameByIds.keySet()) {
- TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
- assertReplicaStatus(tableBucket, 3);
- }
-
- // now, let's check data in paimon per partition
- // check data in paimon
- String partitionCol =
partitionedTableDescriptor.getPartitionKeys().get(0);
- for (String partitionName : partitionNameByIds.values()) {
- checkDataInPaimonAppendOnlyPartitionedTable(
- partitionedTablePath,
- Collections.singletonMap(partitionCol, partitionName),
- writtenRowsByPartition.get(partitionName),
- 0);
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ // check the status of replica after synced
+ assertReplicaStatus(new TableBucket(t1Id, 0), 3);
+ // check data in paimon
+ checkDataInPaimonPrimaryKeyTable(t1, rows);
+ } finally {
+ jobClient.cancel().get();
}
-
- properties =
- new HashMap<String, String>() {
- {
- put(
- FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
- "["
- +
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
- +
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
- + "]");
- }
- };
- checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
-
- jobClient.cancel().get();
}
private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath
partitionedTablePath)
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
index 5a0b8ccd9..6520770c3 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
@@ -73,7 +73,7 @@ class ReCreateSameTableAfterTieringTest extends
FlinkPaimonTieringTestBase {
// check the status of replica after synced
assertReplicaStatus(t1Bucket, 3);
// check data in paimon
- checkDataInPaimonPrimayKeyTable(t1, rows);
+ checkDataInPaimonPrimaryKeyTable(t1, rows);
// then drop the table
dropTable(t1);
@@ -88,7 +88,7 @@ class ReCreateSameTableAfterTieringTest extends
FlinkPaimonTieringTestBase {
// check the status of replica after synced
assertReplicaStatus(t2Bucket, 2);
// check data in paimon
- checkDataInPaimonPrimayKeyTable(t1, newRows);
+ checkDataInPaimonPrimaryKeyTable(t1, newRows);
// stop the tiering job
jobClient.cancel().get();