This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch change-auto-retention
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 78ea02d2e4596531b7f4d06df1512482bdb79dd6
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Mar 26 16:32:54 2026 +0800

    [server] allow user to alter partition nun retention.
    
    tmp
---
 .../client/table/AutoPartitionedTableITCase.java   | 56 +++++++++++++++++
 .../org/apache/fluss/config/FlussConfigUtils.java  |  3 +-
 .../fluss/flink/catalog/FlinkCatalogITCase.java    | 57 +++++++++++++++++
 .../apache/fluss/server/DynamicServerConfig.java   |  2 +-
 .../server/coordinator/AutoPartitionManager.java   | 73 ++++++++++++++--------
 .../coordinator/CoordinatorEventProcessor.java     | 10 +++
 .../coordinator/AutoPartitionManagerTest.java      | 69 ++++++++++++++++++++
 7 files changed, 243 insertions(+), 27 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
index b4f36da42..2e005925c 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java
@@ -32,6 +32,7 @@ import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.GenericRow;
@@ -45,9 +46,12 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -379,6 +383,58 @@ class AutoPartitionedTableITCase extends 
ClientToServerITCaseBase {
         verifyPartitionLogs(table, schema.getRowType(), 
expectPartitionAppendRows);
     }
 
+    @Test
+    void testAlterAutoPartitionRetention() throws Exception {
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_alter_auto_partition_retention");
+
+        // Create an auto-partitioned table with DAY time unit and retention=2
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .column("c", DataTypes.STRING())
+                        .build();
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .partitionedBy("c")
+                        .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true)
+                        .property(
+                                ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
+                                AutoPartitionTimeUnit.DAY)
+                        
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, 2)
+                        .build();
+        createTable(tablePath, descriptor, false);
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
+
+        LocalDate today = LocalDate.now();
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
+
+        // Manually add an old partition that falls outside retention=2 window
+        String oldPartition = today.minusDays(3).format(formatter);
+        admin.createPartition(tablePath, newPartitionSpec("c", oldPartition), 
false).get();
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 3);
+
+        // Now alter retention from 2 to 1 via admin API
+        List<TableChange> changes =
+                Collections.singletonList(
+                        TableChange.set(
+                                
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key(), "1"));
+        admin.alterTable(tablePath, changes, false).get();
+
+        // The old partition should be dropped after the periodic check fires
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsDropped(
+                tablePath, Collections.singletonList(oldPartition));
+
+        // Verify remaining partitions are within the new retention window
+        List<PartitionInfo> partitionInfos = 
admin.listPartitionInfos(tablePath).get();
+        List<String> partitionNames =
+                partitionInfos.stream()
+                        .map(PartitionInfo::getPartitionName)
+                        .collect(Collectors.toList());
+        assertThat(partitionNames).doesNotContain(oldPartition);
+    }
+
     private Schema createPartitionedTable(TablePath tablePath, boolean 
isPrimaryTable)
             throws Exception {
         Schema.Builder schemaBuilder =
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index b29f2a74b..bf9b1428e 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -48,7 +48,8 @@ public class FlussConfigUtils {
                 Arrays.asList(
                         ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
                         ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
-                        ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key());
+                        ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
+                        
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key());
     }
 
     public static boolean isTableStorageConfig(String key) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 638229d8f..2eaf3cad0 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -574,6 +574,63 @@ abstract class FlinkCatalogITCase {
         assertResultsIgnoreOrder(showPartitionIterator, 
expectedShowPartitionsResult, true);
     }
 
+    @Test
+    void testAlterAutoPartitionRetention() throws Exception {
+        String tblName = "test_alter_auto_partition_retention";
+        ObjectPath objectPath = new ObjectPath(DEFAULT_DB, tblName);
+
+        // Create an auto-partitioned table with HOUR time unit and retention=3
+        tEnv.executeSql(
+                "create table "
+                        + tblName
+                        + " (a int, b string) partitioned by (b) "
+                        + "with ('table.auto-partition.enabled' = 'true',"
+                        + " 'table.auto-partition.time-unit' = 'hour',"
+                        + " 'table.auto-partition.num-retention' = '3')");
+
+        TablePath tablePath = new TablePath(DEFAULT_DB, tblName);
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
+
+        String datetimePattern = "yyyyMMddHH";
+        String oldPartition =
+                LocalDateTime.now()
+                        .minusHours(4)
+                        .format(DateTimeFormatter.ofPattern(datetimePattern));
+
+        // Add an old partition that is outside the retention window
+        tEnv.executeSql(
+                String.format("alter table %s add partition (b = '%s')", 
tblName, oldPartition));
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 3);
+        CloseableIterator<Row> showPartitionIterator =
+                tEnv.executeSql("show partitions " + tblName).collect();
+        List<String> partitions =
+                CollectionUtil.iteratorToList(showPartitionIterator).stream()
+                        .map(Row::toString)
+                        .collect(Collectors.toList());
+        assertThat(partitions).doesNotContain(String.format("+I[b=%s]", 
oldPartition));
+
+        // Alter retention from 3 to 1
+        tEnv.executeSql(
+                "alter table " + tblName + " set 
('table.auto-partition.num-retention' = '1')");
+
+        // The old partition should be dropped after the periodic check fires
+        FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsDropped(
+                tablePath, Collections.singletonList(oldPartition));
+
+        // Verify the old partition is no longer listed
+        showPartitionIterator = tEnv.executeSql("show partitions " + 
tblName).collect();
+        partitions =
+                CollectionUtil.iteratorToList(showPartitionIterator).stream()
+                        .map(Row::toString)
+                        .collect(Collectors.toList());
+        assertThat(partitions).contains(String.format("+I[b=%s]", 
oldPartition));
+
+        // Verify the altered property is persisted
+        CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
+        
assertThat(table.getOptions().get(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key()))
+                .isEqualTo("1");
+    }
+
     @Test
     void testTableWithExpression() throws Exception {
         // create a table with watermark and computed column
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
index d4430bb50..634a91a1c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
@@ -164,7 +164,7 @@ class DynamicServerConfig {
         }
 
         // Build new configuration by merging initial + dynamic configs
-        Map<String, String> newConfigMap = buildConfigMap(effectiveChanges);
+        Map<String, String> newConfigMap = buildConfigMap(newDynamicConfigs);
         Configuration newConfig = Configuration.fromMap(newConfigMap);
 
         // Apply changes to all registered ServerReconfigurable instances
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
index 5fb39037f..e4299c3a0 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java
@@ -136,31 +136,34 @@ public class AutoPartitionManager implements 
AutoCloseable {
         tableInfos.forEach(tableInfo -> addAutoPartitionTable(tableInfo, 
false));
     }
 
-    public void addAutoPartitionTable(TableInfo tableInfo, boolean 
forceDoAutoPartition) {
+    public void updateAutoPartitionTables(TableInfo tableInfo) {
         checkNotClosed();
         long tableId = tableInfo.getTableId();
+        LOG.info("Updating auto partition table [{}] (id={})", 
tableInfo.getTablePath(), tableId);
         Set<String> partitions = 
metadataManager.getPartitions(tableInfo.getTablePath());
         inLock(
                 lock,
                 () -> {
-                    autoPartitionTables.put(tableId, tableInfo);
-                    TreeMap<String, Set<String>> partitionMap =
-                            partitionsByTable.computeIfAbsent(
-                                    tableInfo.getTableId(), k -> new 
TreeMap<>());
-                    checkNotNull(partitionMap, "Partition map is null.");
-                    partitions.forEach(
-                            partitionName ->
-                                    addPartitionToPartitionsByTable(
-                                            tableInfo, partitionMap, 
partitionName));
-                    if 
(tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit()
-                            == AutoPartitionTimeUnit.DAY) {
-                        // get the delay minutes to create partition
-                        int delayMinutes = 
ThreadLocalRandom.current().nextInt(60 * 23);
-
-                        autoCreateDayPartitionDelayMinutes.put(tableId, 
delayMinutes);
-                    }
+                    // Remove old state
+                    removeAutoPartitionTableLocked(tableId);
+                    // Add new state
+                    addAutoPartitionTableLocked(tableInfo, partitions);
                 });
 
+        // schedule auto partition for this table immediately
+        periodicExecutor.schedule(() -> doAutoPartition(tableId, true), 0, 
TimeUnit.MILLISECONDS);
+        LOG.info(
+                "Updated auto partition table [{}] (id={}) in scheduler",
+                tableInfo.getTablePath(),
+                tableId);
+    }
+
+    public void addAutoPartitionTable(TableInfo tableInfo, boolean 
forceDoAutoPartition) {
+        checkNotClosed();
+        long tableId = tableInfo.getTableId();
+        Set<String> partitions = 
metadataManager.getPartitions(tableInfo.getTablePath());
+        inLock(lock, () -> addAutoPartitionTableLocked(tableInfo, partitions));
+
         // schedule auto partition for this table immediately
         periodicExecutor.schedule(
                 () -> doAutoPartition(tableId, forceDoAutoPartition), 0, 
TimeUnit.MILLISECONDS);
@@ -172,14 +175,7 @@ public class AutoPartitionManager implements AutoCloseable 
{
 
     public void removeAutoPartitionTable(long tableId) {
         checkNotClosed();
-        TableInfo tableInfo =
-                inLock(
-                        lock,
-                        () -> {
-                            partitionsByTable.remove(tableId);
-                            autoCreateDayPartitionDelayMinutes.remove(tableId);
-                            return autoPartitionTables.remove(tableId);
-                        });
+        TableInfo tableInfo = inLock(lock, () -> 
removeAutoPartitionTableLocked(tableId));
         if (tableInfo != null) {
             LOG.info(
                     "Removed auto partition table [{}] (id={}) from scheduler",
@@ -188,6 +184,32 @@ public class AutoPartitionManager implements AutoCloseable 
{
         }
     }
 
+    /** Must be called while holding {@link #lock}. */
+    @Nullable
+    private TableInfo removeAutoPartitionTableLocked(long tableId) {
+        partitionsByTable.remove(tableId);
+        autoCreateDayPartitionDelayMinutes.remove(tableId);
+        return autoPartitionTables.remove(tableId);
+    }
+
+    /** Must be called while holding {@link #lock}. */
+    private void addAutoPartitionTableLocked(TableInfo tableInfo, Set<String> 
partitions) {
+        long tableId = tableInfo.getTableId();
+        autoPartitionTables.put(tableId, tableInfo);
+        TreeMap<String, Set<String>> partitionMap =
+                partitionsByTable.computeIfAbsent(tableId, k -> new 
TreeMap<>());
+        checkNotNull(partitionMap, "Partition map is null.");
+        partitions.forEach(
+                partitionName ->
+                        addPartitionToPartitionsByTable(tableInfo, 
partitionMap, partitionName));
+        if (tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit()
+                == AutoPartitionTimeUnit.DAY) {
+            // get the delay minutes to create partition
+            int delayMinutes = ThreadLocalRandom.current().nextInt(60 * 23);
+            autoCreateDayPartitionDelayMinutes.put(tableId, delayMinutes);
+        }
+    }
+
     /**
      * Try to add a partition to cache if this table is autoPartitionedTable 
and partition not
      * exists in cache.
@@ -312,6 +334,7 @@ public class AutoPartitionManager implements AutoCloseable {
                         tableId);
                 continue;
             }
+
             dropPartitions(
                     tablePath,
                     tableInfo.getPartitionKeys(),
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index ce6f7f336..9b0767b16 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -118,6 +118,7 @@ import 
org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode;
 import org.apache.fluss.server.zk.data.ZkData.TableIdsZNode;
 import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
 import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
+import org.apache.fluss.utils.AutoPartitionStrategy;
 import org.apache.fluss.utils.types.Tuple2;
 
 import org.slf4j.Logger;
@@ -783,6 +784,15 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                         newTableInfo.getTableId(), newFreshness.toMillis());
             }
         }
+
+        AutoPartitionStrategy autoPartitionStrategy =
+                newTableInfo.getTableConfig().getAutoPartitionStrategy();
+        if (autoPartitionStrategy.isAutoPartitionEnabled()
+                && autoPartitionStrategy.numToRetain()
+                        != 
oldTableInfo.getTableConfig().getAutoPartitionStrategy().numToRetain()) {
+            autoPartitionManager.updateAutoPartitionTables(newTableInfo);
+        }
+
         // more post-alter actions can be added here
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
index 569cd1209..5114ee115 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java
@@ -566,6 +566,53 @@ class AutoPartitionManagerTest {
         assertThat(partitionsNum).isEqualTo(3);
     }
 
+    @Test
+    void testUpdateAutoPartitionNumRetention() throws Exception {
+        // Start at a well-known time
+        ZonedDateTime startTime =
+                
LocalDateTime.parse("2024-09-10T00:00:00").atZone(ZoneId.systemDefault());
+        long startMs = startTime.toInstant().toEpochMilli();
+        ManualClock clock = new ManualClock(startMs);
+        ManuallyTriggeredScheduledExecutorService periodicExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+
+        AutoPartitionManager autoPartitionManager =
+                new AutoPartitionManager(
+                        new TestingServerMetadataCache(3),
+                        metadataManager,
+                        new Configuration(),
+                        clock,
+                        periodicExecutor);
+        autoPartitionManager.start();
+
+        // Create a DAY-partitioned table with numRetention=3, numPreCreate=4
+        TableInfo table = createPartitionedTable(3, 4, 
AutoPartitionTimeUnit.HOUR);
+        TablePath tablePath = table.getTablePath();
+        autoPartitionManager.addAutoPartitionTable(table, true);
+        periodicExecutor.triggerNonPeriodicScheduledTask();
+
+        // pre-create 4 partitions: 2024091000, 2024091001, 2024091002, 
2024091003
+        Map<String, PartitionRegistration> partitions =
+                zookeeperClient.getPartitionRegistrations(tablePath);
+        assertThat(partitions.keySet())
+                .containsExactlyInAnyOrder("2024091000", "2024091001", 
"2024091002", "2024091003");
+
+        // Now update the table to numRetention=1 (more aggressive retention)
+        TableInfo updatedTable =
+                createUpdatedTableInfo(table, /* numRetention= */ 1, /* 
numPreCreate= */ 4);
+        autoPartitionManager.updateAutoPartitionTables(updatedTable);
+        // Advance clock by 4 hours to trigger retention drops and new 
pre-creations
+        clock.advanceTime(Duration.ofHours(4));
+        periodicExecutor.triggerNonPeriodicScheduledTask();
+
+        partitions = zookeeperClient.getPartitionRegistrations(tablePath);
+        // current partition is "2024091004", retain 1 => keep only 
2024091003..2024091004
+        // pre-create 4 from current => 2024091004..2024091007 (already exist)
+        assertThat(partitions.keySet())
+                .containsExactlyInAnyOrder(
+                        "2024091003", "2024091004", "2024091005", 
"2024091006", "2024091007");
+    }
+
     private static class TestParams {
         final AutoPartitionTimeUnit timeUnit;
         final boolean multiplePartitionKeys;
@@ -840,4 +887,26 @@ class AutoPartitionManagerTest {
         zookeeperClient.registerTable(tablePath, registration);
         return tableInfo;
     }
+
+    /** Creates a new TableInfo with updated numRetention and numPreCreate, 
reusing the original. */
+    private TableInfo createUpdatedTableInfo(
+            TableInfo original, int newNumRetention, int newNumPreCreate) {
+        Configuration newProperties = new 
Configuration(original.getProperties());
+        newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, 
newNumRetention);
+        newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 
newNumPreCreate);
+        return new TableInfo(
+                original.getTablePath(),
+                original.getTableId(),
+                original.getSchemaId(),
+                original.getSchema(),
+                original.getBucketKeys(),
+                original.getPartitionKeys(),
+                original.getNumBuckets(),
+                newProperties,
+                original.getCustomProperties(),
+                original.getRemoteDataDir(),
+                original.getComment().orElse(null),
+                original.getCreatedTime(),
+                System.currentTimeMillis());
+    }
 }

Reply via email to