This is an automated email from the ASF dual-hosted git repository. hongshun pushed a commit to branch change-auto-retention in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 78ea02d2e4596531b7f4d06df1512482bdb79dd6 Author: Hongshun Wang <[email protected]> AuthorDate: Thu Mar 26 16:32:54 2026 +0800 [server] allow user to alter partition nun retention. tmp --- .../client/table/AutoPartitionedTableITCase.java | 56 +++++++++++++++++ .../org/apache/fluss/config/FlussConfigUtils.java | 3 +- .../fluss/flink/catalog/FlinkCatalogITCase.java | 57 +++++++++++++++++ .../apache/fluss/server/DynamicServerConfig.java | 2 +- .../server/coordinator/AutoPartitionManager.java | 73 ++++++++++++++-------- .../coordinator/CoordinatorEventProcessor.java | 10 +++ .../coordinator/AutoPartitionManagerTest.java | 69 ++++++++++++++++++++ 7 files changed, 243 insertions(+), 27 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java index b4f36da42..2e005925c 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java @@ -32,6 +32,7 @@ import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.GenericRow; @@ -45,9 +46,12 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -379,6 +383,58 @@ class AutoPartitionedTableITCase extends ClientToServerITCaseBase { verifyPartitionLogs(table, schema.getRowType(), expectPartitionAppendRows); } + @Test + void testAlterAutoPartitionRetention() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_alter_auto_partition_retention"); + + // Create an auto-partitioned table with DAY time unit and retention=2 + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.STRING()) + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .partitionedBy("c") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.DAY) + .property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, 2) + .build(); + createTable(tablePath, descriptor, false); + FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath); + + LocalDate today = LocalDate.now(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); + + // Manually add an old partition that falls outside retention=2 window + String oldPartition = today.minusDays(3).format(formatter); + admin.createPartition(tablePath, newPartitionSpec("c", oldPartition), false).get(); + FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 3); + + // Now alter retention from 2 to 1 via admin API + List<TableChange> changes = + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key(), "1")); + admin.alterTable(tablePath, changes, false).get(); + + // The old partition should be dropped after the periodic check fires + FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsDropped( + tablePath, Collections.singletonList(oldPartition)); + + // Verify remaining partitions are within the new retention window + List<PartitionInfo> partitionInfos = admin.listPartitionInfos(tablePath).get(); + List<String> partitionNames = + partitionInfos.stream() + .map(PartitionInfo::getPartitionName) + .collect(Collectors.toList()); + assertThat(partitionNames).doesNotContain(oldPartition); + } + private Schema createPartitionedTable(TablePath tablePath, boolean isPrimaryTable) throws Exception { Schema.Builder schemaBuilder = diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index b29f2a74b..bf9b1428e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -48,7 +48,8 @@ public class FlussConfigUtils { Arrays.asList( ConfigOptions.TABLE_DATALAKE_ENABLED.key(), ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), - ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key()); + ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), + ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key()); } public static boolean isTableStorageConfig(String key) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index 638229d8f..2eaf3cad0 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -574,6 +574,63 @@ abstract class FlinkCatalogITCase { assertResultsIgnoreOrder(showPartitionIterator, expectedShowPartitionsResult, true); } + @Test + void testAlterAutoPartitionRetention() throws Exception { + String tblName = "test_alter_auto_partition_retention"; + ObjectPath objectPath = new ObjectPath(DEFAULT_DB, tblName); + + // Create an auto-partitioned table with HOUR time unit and retention=3 + tEnv.executeSql( + "create table " + + tblName + + " (a int, b string) partitioned by (b) " + + "with ('table.auto-partition.enabled' = 'true'," + + " 'table.auto-partition.time-unit' = 'hour'," + + " 'table.auto-partition.num-retention' = '3')"); + + TablePath tablePath = new TablePath(DEFAULT_DB, tblName); + FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath); + + String datetimePattern = "yyyyMMddHH"; + String oldPartition = + LocalDateTime.now() + .minusHours(4) + .format(DateTimeFormatter.ofPattern(datetimePattern)); + + // Add an old partition that is outside the retention window + tEnv.executeSql( + String.format("alter table %s add partition (b = '%s')", tblName, oldPartition)); + FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 3); + CloseableIterator<Row> showPartitionIterator = + tEnv.executeSql("show partitions " + tblName).collect(); + List<String> partitions = + CollectionUtil.iteratorToList(showPartitionIterator).stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(partitions).doesNotContain(String.format("+I[b=%s]", oldPartition)); + + // Alter retention from 3 to 1 + tEnv.executeSql( + "alter table " + tblName + " set ('table.auto-partition.num-retention' = '1')"); + + // The old partition should be dropped after the periodic check fires + FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsDropped( + tablePath, Collections.singletonList(oldPartition)); + + // Verify the old partition is no longer listed + showPartitionIterator = tEnv.executeSql("show partitions " + tblName).collect(); + partitions = + CollectionUtil.iteratorToList(showPartitionIterator).stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(partitions).contains(String.format("+I[b=%s]", oldPartition)); + + // Verify the altered property is persisted + CatalogTable table = (CatalogTable) catalog.getTable(objectPath); + assertThat(table.getOptions().get(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key())) + .isEqualTo("1"); + } + @Test void testTableWithExpression() throws Exception { // create a table with watermark and computed column diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java index d4430bb50..634a91a1c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -164,7 +164,7 @@ class DynamicServerConfig { } // Build new configuration by merging initial + dynamic configs - Map<String, String> newConfigMap = buildConfigMap(effectiveChanges); + Map<String, String> newConfigMap = buildConfigMap(newDynamicConfigs); Configuration newConfig = Configuration.fromMap(newConfigMap); // Apply changes to all registered ServerReconfigurable instances diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java index 5fb39037f..e4299c3a0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java @@ -136,31 +136,34 @@ public class AutoPartitionManager implements AutoCloseable { tableInfos.forEach(tableInfo -> addAutoPartitionTable(tableInfo, false)); } - public void addAutoPartitionTable(TableInfo tableInfo, boolean forceDoAutoPartition) { + public void updateAutoPartitionTables(TableInfo tableInfo) { checkNotClosed(); long tableId = tableInfo.getTableId(); + LOG.info("Updating auto partition table [{}] (id={})", tableInfo.getTablePath(), tableId); Set<String> partitions = metadataManager.getPartitions(tableInfo.getTablePath()); inLock( lock, () -> { - autoPartitionTables.put(tableId, tableInfo); - TreeMap<String, Set<String>> partitionMap = - partitionsByTable.computeIfAbsent( - tableInfo.getTableId(), k -> new TreeMap<>()); - checkNotNull(partitionMap, "Partition map is null."); - partitions.forEach( - partitionName -> - addPartitionToPartitionsByTable( - tableInfo, partitionMap, partitionName)); - if (tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit() - == AutoPartitionTimeUnit.DAY) { - // get the delay minutes to create partition - int delayMinutes = ThreadLocalRandom.current().nextInt(60 * 23); - - autoCreateDayPartitionDelayMinutes.put(tableId, delayMinutes); - } + // Remove old state + removeAutoPartitionTableLocked(tableId); + // Add new state + addAutoPartitionTableLocked(tableInfo, partitions); }); + // schedule auto partition for this table immediately + periodicExecutor.schedule(() -> doAutoPartition(tableId, true), 0, TimeUnit.MILLISECONDS); + LOG.info( + "Updated auto partition table [{}] (id={}) in scheduler", + tableInfo.getTablePath(), + tableId); + } + + public void addAutoPartitionTable(TableInfo tableInfo, boolean forceDoAutoPartition) { + checkNotClosed(); + long tableId = tableInfo.getTableId(); + Set<String> partitions = metadataManager.getPartitions(tableInfo.getTablePath()); + inLock(lock, () -> addAutoPartitionTableLocked(tableInfo, partitions)); + // schedule auto partition for this table immediately periodicExecutor.schedule( () -> doAutoPartition(tableId, forceDoAutoPartition), 0, TimeUnit.MILLISECONDS); @@ -172,14 +175,7 @@ public class AutoPartitionManager implements AutoCloseable { public void removeAutoPartitionTable(long tableId) { checkNotClosed(); - TableInfo tableInfo = - inLock( - lock, - () -> { - partitionsByTable.remove(tableId); - autoCreateDayPartitionDelayMinutes.remove(tableId); - return autoPartitionTables.remove(tableId); - }); + TableInfo tableInfo = inLock(lock, () -> removeAutoPartitionTableLocked(tableId)); if (tableInfo != null) { LOG.info( "Removed auto partition table [{}] (id={}) from scheduler", @@ -188,6 +184,32 @@ public class AutoPartitionManager implements AutoCloseable { } } + /** Must be called while holding {@link #lock}. */ + @Nullable + private TableInfo removeAutoPartitionTableLocked(long tableId) { + partitionsByTable.remove(tableId); + autoCreateDayPartitionDelayMinutes.remove(tableId); + return autoPartitionTables.remove(tableId); + } + + /** Must be called while holding {@link #lock}. */ + private void addAutoPartitionTableLocked(TableInfo tableInfo, Set<String> partitions) { + long tableId = tableInfo.getTableId(); + autoPartitionTables.put(tableId, tableInfo); + TreeMap<String, Set<String>> partitionMap = + partitionsByTable.computeIfAbsent(tableId, k -> new TreeMap<>()); + checkNotNull(partitionMap, "Partition map is null."); + partitions.forEach( + partitionName -> + addPartitionToPartitionsByTable(tableInfo, partitionMap, partitionName)); + if (tableInfo.getTableConfig().getAutoPartitionStrategy().timeUnit() + == AutoPartitionTimeUnit.DAY) { + // get the delay minutes to create partition + int delayMinutes = ThreadLocalRandom.current().nextInt(60 * 23); + autoCreateDayPartitionDelayMinutes.put(tableId, delayMinutes); + } + } + /** * Try to add a partition to cache if this table is autoPartitionedTable and partition not * exists in cache. @@ -312,6 +334,7 @@ public class AutoPartitionManager implements AutoCloseable { tableId); continue; } + dropPartitions( tablePath, tableInfo.getPartitionKeys(), diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index ce6f7f336..9b0767b16 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -118,6 +118,7 @@ import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode; import org.apache.fluss.server.zk.data.ZkData.TableIdsZNode; import org.apache.fluss.server.zk.data.lake.LakeTableHelper; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import org.apache.fluss.utils.AutoPartitionStrategy; import org.apache.fluss.utils.types.Tuple2; import org.slf4j.Logger; @@ -783,6 +784,15 @@ public class CoordinatorEventProcessor implements EventProcessor { newTableInfo.getTableId(), newFreshness.toMillis()); } } + + AutoPartitionStrategy autoPartitionStrategy = + newTableInfo.getTableConfig().getAutoPartitionStrategy(); + if (autoPartitionStrategy.isAutoPartitionEnabled() + && autoPartitionStrategy.numToRetain() + != oldTableInfo.getTableConfig().getAutoPartitionStrategy().numToRetain()) { + autoPartitionManager.updateAutoPartitionTables(newTableInfo); + } + // more post-alter actions can be added here } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java index 569cd1209..5114ee115 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java @@ -566,6 +566,53 @@ class AutoPartitionManagerTest { assertThat(partitionsNum).isEqualTo(3); } + @Test + void testUpdateAutoPartitionNumRetention() throws Exception { + // Start at a well-known time + ZonedDateTime startTime = + LocalDateTime.parse("2024-09-10T00:00:00").atZone(ZoneId.systemDefault()); + long startMs = startTime.toInstant().toEpochMilli(); + ManualClock clock = new ManualClock(startMs); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + + AutoPartitionManager autoPartitionManager = + new AutoPartitionManager( + new TestingServerMetadataCache(3), + metadataManager, + new Configuration(), + clock, + periodicExecutor); + autoPartitionManager.start(); + + // Create a DAY-partitioned table with numRetention=3, numPreCreate=4 + TableInfo table = createPartitionedTable(3, 4, AutoPartitionTimeUnit.HOUR); + TablePath tablePath = table.getTablePath(); + autoPartitionManager.addAutoPartitionTable(table, true); + periodicExecutor.triggerNonPeriodicScheduledTask(); + + // pre-create 4 partitions: 2024091000, 2024091001, 2024091002, 2024091003 + Map<String, PartitionRegistration> partitions = + zookeeperClient.getPartitionRegistrations(tablePath); + assertThat(partitions.keySet()) + .containsExactlyInAnyOrder("2024091000", "2024091001", "2024091002", "2024091003"); + + // Now update the table to numRetention=1 (more aggressive retention) + TableInfo updatedTable = + createUpdatedTableInfo(table, /* numRetention= */ 1, /* numPreCreate= */ 4); + autoPartitionManager.updateAutoPartitionTables(updatedTable); + // Advance clock by 4 hours to trigger retention drops and new pre-creations + clock.advanceTime(Duration.ofHours(4)); + periodicExecutor.triggerNonPeriodicScheduledTask(); + + partitions = zookeeperClient.getPartitionRegistrations(tablePath); + // current partition is "2024091004", retain 1 => keep only 2024091003..2024091004 + // pre-create 4 from current => 2024091004..2024091007 (already exist) + assertThat(partitions.keySet()) + .containsExactlyInAnyOrder( + "2024091003", "2024091004", "2024091005", "2024091006", "2024091007"); + } + private static class TestParams { final AutoPartitionTimeUnit timeUnit; final boolean multiplePartitionKeys; @@ -840,4 +887,26 @@ class AutoPartitionManagerTest { zookeeperClient.registerTable(tablePath, registration); return tableInfo; } + + /** Creates a new TableInfo with updated numRetention and numPreCreate, reusing the original. */ + private TableInfo createUpdatedTableInfo( + TableInfo original, int newNumRetention, int newNumPreCreate) { + Configuration newProperties = new Configuration(original.getProperties()); + newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, newNumRetention); + newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, newNumPreCreate); + return new TableInfo( + original.getTablePath(), + original.getTableId(), + original.getSchemaId(), + original.getSchema(), + original.getBucketKeys(), + original.getPartitionKeys(), + original.getNumBuckets(), + newProperties, + original.getCustomProperties(), + original.getRemoteDataDir(), + original.getComment().orElse(null), + original.getCreatedTime(), + System.currentTimeMillis()); + } }
