This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 96aee597e [lake/paimon] Fix partition expiration never triggered in 
Paimon lake table managed by Tiering Service (#2862)
96aee597e is described below

commit 96aee597e685879968a271578b80daa12ab4d6f6
Author: Liebing <[email protected]>
AuthorDate: Fri Mar 13 20:37:57 2026 +0800

    [lake/paimon] Fix partition expiration never triggered in Paimon lake table 
managed by Tiering Service (#2862)
---
 .../lake/paimon/tiering/PaimonLakeCommitter.java   | 18 +++++-
 .../lake/paimon/tiering/PaimonTieringTest.java     | 69 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 3 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 1018fe78d..bae9da3b3 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -265,9 +265,21 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
             dynamicOptions.put(
                     CoreOptions.COMMIT_CALLBACKS.key(),
                     PaimonLakeCommitter.PaimonCommitCallback.class.getName());
-            dynamicOptions.put(
-                    CoreOptions.WRITE_ONLY.key(),
-                    isAutoSnapshotExpiration ? Boolean.FALSE.toString() : 
Boolean.TRUE.toString());
+
+            boolean writeOnly = !isAutoSnapshotExpiration;
+            dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), 
Boolean.toString(writeOnly));
+
+            // For non-write-only modes, we enable 
'end-input.check-partition-expire' to ensure
+            // Paimon triggers partition expiration on every commit.
+            // Note: This is necessary even if 
'paimon.partition.expiration-check-interval' is
+            // already configured. Because the Fluss tiering service creates a 
fresh TableCommit
+            // instance for each commit, the interval-based expiration check 
will not be triggered
+            // correctly otherwise.
+            if (!writeOnly) {
+                dynamicOptions.put(
+                        CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE.key(),
+                        Boolean.TRUE.toString());
+            }
 
             return table.copy(dynamicOptions);
         } catch (Exception e) {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 600c3ba66..1b2303426 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -43,6 +43,7 @@ import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
@@ -119,6 +120,14 @@ class PaimonTieringTest {
                 Arguments.of(false, false, false));
     }
 
+    private static Stream<Arguments> partitionExpireArgs() {
+        return Stream.of(
+                Arguments.of(true, true),
+                Arguments.of(true, false),
+                Arguments.of(false, true),
+                Arguments.of(false, false));
+    }
+
     @ParameterizedTest
     @MethodSource("tieringWriteArgs")
     void testTieringWriteTable(boolean isPrimaryKeyTable, boolean 
isPartitioned) throws Exception {
@@ -421,6 +430,66 @@ class PaimonTieringTest {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("partitionExpireArgs")
+    void testPartitionExpiration(
+            boolean isTableAutoExpireSnapshot, boolean 
isLakeTieringExpireSnapshot)
+            throws Exception {
+        TablePath tablePath = TablePath.of("paimon", "test_partition_expire");
+        // Use a fixed ancient date so the partition is always considered 
expired.
+        // "20200101" with expiration-time=1d is always past-due – no 
wall-clock dependency.
+        String expiredPartition = "20200101";
+
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "1 d");
+        
tableOptions.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "10 
min");
+        tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
"yyyyMMdd");
+        createTable(tablePath, false, true, null, tableOptions);
+
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                org.apache.fluss.metadata.Schema.newBuilder()
+                                        .column("c1", 
org.apache.fluss.types.DataTypes.INT())
+                                        .column("c2", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .column("c3", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .build())
+                        .partitionedBy("c3")
+                        .distributedBy(1)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .property(
+                                
ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT,
+                                isTableAutoExpireSnapshot)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
+        Configuration lakeTieringConfig = new Configuration();
+        lakeTieringConfig.set(
+                ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT, 
isLakeTieringExpireSnapshot);
+
+        Map<Long, String> partitionIdAndName =
+                new HashMap<Long, String>() {
+                    {
+                        put(1L, expiredPartition);
+                    }
+                };
+
+        // Simulate multiple tiering rounds – each round creates a new 
LakeCommitter (try-with-
+        // resources), exactly as TieringCommitOperator does in production.
+        for (int round = 0; round < 5; round++) {
+            writeData(tableInfo, lakeTieringConfig, new HashMap<>(), 
partitionIdAndName);
+        }
+
+        List<Partition> partitions = 
paimonCatalog.listPartitions(toPaimon(tablePath));
+        if (isTableAutoExpireSnapshot || isLakeTieringExpireSnapshot) {
+            // if auto snapshot expiration is enabled, partition should also 
be expired
+            assertThat(partitions).isEmpty();
+        } else {
+            // if auto snapshot expiration is disabled, partition should never 
be expired
+            assertThat(partitions).isNotEmpty();
+        }
+    }
+
     private void verifyLogTableRecordsMultiPartition(
             CloseableIterator<InternalRow> actualRecords,
             List<LogRecord> expectRecords,

Reply via email to