This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 96aee597e [lake/paimon] Fix partition expiration never triggered in
Paimon lake table managed by Tiering Service (#2862)
96aee597e is described below
commit 96aee597e685879968a271578b80daa12ab4d6f6
Author: Liebing <[email protected]>
AuthorDate: Fri Mar 13 20:37:57 2026 +0800
[lake/paimon] Fix partition expiration never triggered in Paimon lake table
managed by Tiering Service (#2862)
---
.../lake/paimon/tiering/PaimonLakeCommitter.java | 18 +++++-
.../lake/paimon/tiering/PaimonTieringTest.java | 69 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 3 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 1018fe78d..bae9da3b3 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -265,9 +265,21 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
dynamicOptions.put(
CoreOptions.COMMIT_CALLBACKS.key(),
PaimonLakeCommitter.PaimonCommitCallback.class.getName());
- dynamicOptions.put(
- CoreOptions.WRITE_ONLY.key(),
- isAutoSnapshotExpiration ? Boolean.FALSE.toString() :
Boolean.TRUE.toString());
+
+ boolean writeOnly = !isAutoSnapshotExpiration;
+ dynamicOptions.put(CoreOptions.WRITE_ONLY.key(),
Boolean.toString(writeOnly));
+
+ // For non-write-only modes, we enable
'end-input.check-partition-expire' to ensure
+ // Paimon triggers partition expiration on every commit.
+ // Note: This is necessary even if
'paimon.partition.expiration-check-interval' is
+ // already configured. Because the Fluss tiering service creates a
fresh TableCommit
+ // instance for each commit, the interval-based expiration check
will not be triggered
+ // correctly otherwise.
+ if (!writeOnly) {
+ dynamicOptions.put(
+ CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE.key(),
+ Boolean.TRUE.toString());
+ }
return table.copy(dynamicOptions);
} catch (Exception e) {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 600c3ba66..1b2303426 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -43,6 +43,7 @@ import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
@@ -119,6 +120,14 @@ class PaimonTieringTest {
Arguments.of(false, false, false));
}
+ private static Stream<Arguments> partitionExpireArgs() {
+ return Stream.of(
+ Arguments.of(true, true),
+ Arguments.of(true, false),
+ Arguments.of(false, true),
+ Arguments.of(false, false));
+ }
+
@ParameterizedTest
@MethodSource("tieringWriteArgs")
void testTieringWriteTable(boolean isPrimaryKeyTable, boolean
isPartitioned) throws Exception {
@@ -421,6 +430,66 @@ class PaimonTieringTest {
}
}
+ @ParameterizedTest
+ @MethodSource("partitionExpireArgs")
+ void testPartitionExpiration(
+ boolean isTableAutoExpireSnapshot, boolean
isLakeTieringExpireSnapshot)
+ throws Exception {
+ TablePath tablePath = TablePath.of("paimon", "test_partition_expire");
+ // Use a fixed ancient date so the partition is always considered
expired.
+ // "20200101" with expiration-time=1d is always past-due – no
wall-clock dependency.
+ String expiredPartition = "20200101";
+
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "1 d");
+
tableOptions.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "10
min");
+ tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
"yyyyMMdd");
+ createTable(tablePath, false, true, null, tableOptions);
+
+ TableDescriptor descriptor =
+ TableDescriptor.builder()
+ .schema(
+ org.apache.fluss.metadata.Schema.newBuilder()
+ .column("c1",
org.apache.fluss.types.DataTypes.INT())
+ .column("c2",
org.apache.fluss.types.DataTypes.STRING())
+ .column("c3",
org.apache.fluss.types.DataTypes.STRING())
+ .build())
+ .partitionedBy("c3")
+ .distributedBy(1)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .property(
+
ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT,
+ isTableAutoExpireSnapshot)
+ .build();
+ TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L,
1L);
+
+ Configuration lakeTieringConfig = new Configuration();
+ lakeTieringConfig.set(
+ ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT,
isLakeTieringExpireSnapshot);
+
+ Map<Long, String> partitionIdAndName =
+ new HashMap<Long, String>() {
+ {
+ put(1L, expiredPartition);
+ }
+ };
+
+ // Simulate multiple tiering rounds – each round creates a new
LakeCommitter (try-with-
+ // resources), exactly as TieringCommitOperator does in production.
+ for (int round = 0; round < 5; round++) {
+ writeData(tableInfo, lakeTieringConfig, new HashMap<>(),
partitionIdAndName);
+ }
+
+ List<Partition> partitions =
paimonCatalog.listPartitions(toPaimon(tablePath));
+ if (isTableAutoExpireSnapshot || isLakeTieringExpireSnapshot) {
+ // if auto snapshot expiration is enabled, partition should also
be expired
+ assertThat(partitions).isEmpty();
+ } else {
+ // if auto snapshot expiration is disabled, partition should never
be expired
+ assertThat(partitions).isNotEmpty();
+ }
+ }
+
private void verifyLogTableRecordsMultiPartition(
CloseableIterator<InternalRow> actualRecords,
List<LogRecord> expectRecords,