This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new e6e95cabc [kv] Support dynamic configuration for 
'kv.snapshot.interval' (#2820)
e6e95cabc is described below

commit e6e95cabc3c6bb7ee3fa8dc91ccbc5e7bf611366
Author: yunhong <[email protected]>
AuthorDate: Wed Mar 11 10:16:29 2026 +0800

    [kv] Support dynamic configuration for 'kv.snapshot.interval' (#2820)
    
    * [kv] Support dynamic configuration for 'kv.snapshot.interval'
    
    * address yubing's comments
---
 .../apache/fluss/server/DynamicServerConfig.java   |  5 +-
 .../server/kv/snapshot/DefaultSnapshotContext.java | 38 ++++++++++++-
 .../kv/snapshot/PeriodicSnapshotManager.java       | 53 +++++++++++-------
 .../org/apache/fluss/server/replica/Replica.java   |  3 +-
 .../fluss/server/replica/ReplicaManager.java       |  7 ++-
 .../apache/fluss/server/tablet/TabletServer.java   |  7 ++-
 .../fluss/server/DynamicConfigChangeTest.java      | 65 ++++++++++++++++++++++
 .../kv/snapshot/KvTabletSnapshotTargetTest.java    |  4 +-
 .../kv/snapshot/PeriodicSnapshotManagerTest.java   | 42 +++++++++++++-
 9 files changed, 190 insertions(+), 34 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
index bb19aa331..f3ae90496 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
@@ -43,6 +43,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
 import static 
org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
+import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL;
 import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
 import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
 
@@ -59,7 +60,9 @@ class DynamicServerConfig {
     private static final Set<String> ALLOWED_CONFIG_KEYS =
             new HashSet<>(
                     Arrays.asList(
-                            DATALAKE_FORMAT.key(), 
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()));
+                            DATALAKE_FORMAT.key(),
+                            KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+                            KV_SNAPSHOT_INTERVAL.key()));
     private static final Set<String> ALLOWED_CONFIG_PREFIXES = 
Collections.singleton("datalake.");
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
index a6894851b..9bec8977c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
@@ -19,6 +19,8 @@ package org.apache.fluss.server.kv.snapshot;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.cluster.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.server.kv.KvSnapshotResource;
@@ -26,12 +28,17 @@ import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.utils.FlussPaths;
 import org.apache.fluss.utils.function.FunctionWithException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 /** A default implementation for {@link SnapshotContext}. */
-public class DefaultSnapshotContext implements SnapshotContext {
+public class DefaultSnapshotContext implements SnapshotContext, 
ServerReconfigurable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSnapshotContext.class);
 
     private final ZooKeeperClient zooKeeperClient;
     private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter;
@@ -41,7 +48,7 @@ public class DefaultSnapshotContext implements 
SnapshotContext {
     private final KvSnapshotDataUploader kvSnapshotDataUploader;
     private final KvSnapshotDataDownloader kvSnapshotDataDownloader;
 
-    private final long kvSnapshotIntervalMs;
+    private volatile long kvSnapshotIntervalMs;
 
     /** The write buffer size for writing the kv snapshot file to remote 
filesystem. */
     private final int writeBufferSizeInBytes;
@@ -161,4 +168,31 @@ public class DefaultSnapshotContext implements 
SnapshotContext {
         completedSnapshotHandleStore.remove(snapshot.getTableBucket(), 
snapshot.getSnapshotID());
         snapshot.discardAsync(asyncOperationsThreadPool);
     }
+
+    // ============ ServerReconfigurable Implementation ============
+
+    @Override
+    public void validate(Configuration newConfig) throws ConfigException {
+        // Type validation is already handled by DynamicServerConfig.
+        // Here we only do basic sanity checks.
+        long newIntervalMs = 
newConfig.get(ConfigOptions.KV_SNAPSHOT_INTERVAL).toMillis();
+        if (newIntervalMs <= 0) {
+            throw new ConfigException(
+                    String.format(
+                            "Invalid kv.snapshot.interval can not be negative 
or zero: %d ms",
+                            newIntervalMs));
+        }
+    }
+
+    @Override
+    public void reconfigure(Configuration newConfig) {
+        long newIntervalMs = 
newConfig.get(ConfigOptions.KV_SNAPSHOT_INTERVAL).toMillis();
+        if (newIntervalMs == kvSnapshotIntervalMs) {
+            LOG.debug("kv.snapshot.interval unchanged: {} ms", newIntervalMs);
+            return;
+        }
+        long oldIntervalMs = kvSnapshotIntervalMs;
+        kvSnapshotIntervalMs = newIntervalMs;
+        LOG.info("kv.snapshot.interval reconfigured: {} ms -> {} ms", 
oldIntervalMs, newIntervalMs);
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index 9ba9ecbd6..566d6ab10 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -21,7 +21,6 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.fs.FileSystemSafetyNet;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.server.metrics.group.BucketMetricGroup;
 import org.apache.fluss.utils.MathUtils;
 import org.apache.fluss.utils.concurrent.Executors;
 import org.apache.fluss.utils.concurrent.FutureUtils;
@@ -41,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongSupplier;
 
 /* This file is based on source code of Apache Flink Project 
(https://flink.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -69,7 +69,11 @@ public class PeriodicSnapshotManager implements Closeable {
     /** Async thread pool, to complete async phase of snapshot. */
     private final ExecutorService asyncOperationsThreadPool;
 
-    private final long periodicSnapshotDelay;
+    /**
+     * A supplier to get the current snapshot delay. This allows dynamic 
reconfiguration of the
+     * snapshot interval at runtime.
+     */
+    private final LongSupplier snapshotIntervalSupplier;
 
     /** Number of consecutive snapshot failures. */
     private final AtomicInteger numberOfConsecutiveFailures;
@@ -98,35 +102,48 @@ public class PeriodicSnapshotManager implements Closeable {
             SnapshotTarget target,
             long periodicSnapshotDelay,
             ExecutorService asyncOperationsThreadPool,
-            ScheduledExecutorService periodicExecutor,
-            BucketMetricGroup bucketMetricGroup) {
+            ScheduledExecutorService periodicExecutor) {
         this(
                 tableBucket,
                 target,
-                periodicSnapshotDelay,
+                () -> periodicSnapshotDelay,
                 asyncOperationsThreadPool,
                 periodicExecutor,
-                Executors.directExecutor(),
-                bucketMetricGroup);
+                Executors.directExecutor());
     }
 
     @VisibleForTesting
     protected PeriodicSnapshotManager(
             TableBucket tableBucket,
             SnapshotTarget target,
-            long periodicSnapshotDelay,
+            LongSupplier snapshotIntervalSupplier,
+            ExecutorService asyncOperationsThreadPool,
+            ScheduledExecutorService periodicExecutor) {
+        this(
+                tableBucket,
+                target,
+                snapshotIntervalSupplier,
+                asyncOperationsThreadPool,
+                periodicExecutor,
+                Executors.directExecutor());
+    }
+
+    private PeriodicSnapshotManager(
+            TableBucket tableBucket,
+            SnapshotTarget target,
+            LongSupplier snapshotIntervalSupplier,
             ExecutorService asyncOperationsThreadPool,
             ScheduledExecutorService periodicExecutor,
-            Executor guardedExecutor,
-            BucketMetricGroup bucketMetricGroup) {
+            Executor guardedExecutor) {
         this.tableBucket = tableBucket;
         this.target = target;
-        this.periodicSnapshotDelay = periodicSnapshotDelay;
+        this.snapshotIntervalSupplier = snapshotIntervalSupplier;
 
         this.numberOfConsecutiveFailures = new AtomicInteger(0);
         this.periodicExecutor = periodicExecutor;
         this.guardedExecutor = guardedExecutor;
         this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+        long periodicSnapshotDelay = snapshotIntervalSupplier.getAsLong();
         this.initialDelay =
                 periodicSnapshotDelay > 0
                         ? MathUtils.murmurHash(tableBucket.hashCode()) % 
periodicSnapshotDelay
@@ -137,21 +154,19 @@ public class PeriodicSnapshotManager implements Closeable 
{
             TableBucket tableBucket,
             SnapshotTarget target,
             SnapshotContext snapshotContext,
-            Executor guardedExecutor,
-            BucketMetricGroup bucketMetricGroup) {
+            Executor guardedExecutor) {
         return new PeriodicSnapshotManager(
                 tableBucket,
                 target,
-                snapshotContext.getSnapshotIntervalMs(),
+                snapshotContext::getSnapshotIntervalMs,
                 snapshotContext.getAsyncOperationsThreadPool(),
                 snapshotContext.getSnapshotScheduler(),
-                guardedExecutor,
-                bucketMetricGroup);
+                guardedExecutor);
     }
 
     public void start() {
         // disable periodic snapshot when periodicMaterializeDelay is not 
positive
-        if (!started && periodicSnapshotDelay > 0) {
+        if (!started && initialDelay > 0) {
 
             started = true;
 
@@ -216,7 +231,7 @@ public class PeriodicSnapshotManager implements Closeable {
                                     "TableBucket {} has no data updates since 
last snapshot, "
                                             + "skip this one and schedule the 
next one in {} seconds",
                                     tableBucket,
-                                    periodicSnapshotDelay / 1000);
+                                    snapshotIntervalSupplier.getAsLong() / 
1000);
                         }
                     }
                 });
@@ -324,7 +339,7 @@ public class PeriodicSnapshotManager implements Closeable {
     }
 
     private void scheduleNextSnapshot() {
-        scheduleNextSnapshot(periodicSnapshotDelay);
+        scheduleNextSnapshot(snapshotIntervalSupplier.getAsLong());
     }
 
     /** {@link SnapshotRunnable} provider and consumer. */
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 10678eb5c..f73bcdda2 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -931,8 +931,7 @@ public final class Replica {
                             tableBucket,
                             kvTabletSnapshotTarget,
                             snapshotContext,
-                            kvTablet.getGuardedExecutor(),
-                            bucketMetricGroup);
+                            kvTablet.getGuardedExecutor());
             kvSnapshotManager.start();
             closeableRegistryForKv.registerCloseable(kvSnapshotManager);
         } catch (Exception e) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index c891e5d3d..3fc024209 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -79,7 +79,6 @@ import org.apache.fluss.server.kv.KvManager;
 import org.apache.fluss.server.kv.KvSnapshotResource;
 import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter;
 import org.apache.fluss.server.kv.snapshot.DefaultSnapshotContext;
-import org.apache.fluss.server.kv.snapshot.SnapshotContext;
 import org.apache.fluss.server.log.FetchDataInfo;
 import org.apache.fluss.server.log.FetchParams;
 import org.apache.fluss.server.log.ListOffsetsParam;
@@ -193,7 +192,7 @@ public class ReplicaManager {
 
     // for kv snapshot
     private final KvSnapshotResource kvSnapshotResource;
-    private final SnapshotContext kvSnapshotContext;
+    private final DefaultSnapshotContext kvSnapshotContext;
 
     // remote log manager for remote log storage.
     private final RemoteLogManager remoteLogManager;
@@ -323,6 +322,10 @@ public class ReplicaManager {
         return remoteLogManager;
     }
 
+    public DefaultSnapshotContext getKvSnapshotContext() {
+        return kvSnapshotContext;
+    }
+
     private void registerMetrics() {
         serverMetricGroup.gauge(
                 MetricNames.REPLICA_LEADER_COUNT,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index d108a6337..93e8951b7 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -231,8 +231,6 @@ public class TabletServer extends ServerBase {
 
             // Register kvManager to dynamicConfigManager for dynamic 
reconfiguration
             dynamicConfigManager.register(kvManager);
-            // Start dynamicConfigManager after all reconfigurable components 
are registered
-            dynamicConfigManager.startup();
 
             this.authorizer = AuthorizerLoader.createAuthorizer(conf, 
zkClient, pluginManager);
             if (authorizer != null) {
@@ -275,6 +273,11 @@ public class TabletServer extends ServerBase {
                             ioExecutor);
             replicaManager.startup();
 
+            // Register DefaultSnapshotContext for dynamic kv.snapshot.interval
+            
dynamicConfigManager.register(replicaManager.getKvSnapshotContext());
+            // Start dynamicConfigManager after all reconfigurable components 
are registered
+            dynamicConfigManager.startup();
+
             this.tabletService =
                     new TabletService(
                             serverId,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
index 2ba44ac41..aa55372f7 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
@@ -21,6 +21,7 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.cluster.AlterConfig;
 import org.apache.fluss.config.cluster.AlterConfigOpType;
+import org.apache.fluss.config.cluster.ServerReconfigurable;
 import org.apache.fluss.exception.ConfigException;
 import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader;
 import org.apache.fluss.server.zk.NOPErrorHandler;
@@ -41,6 +42,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
 import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
@@ -347,4 +349,67 @@ public class DynamicConfigChangeTest {
                     .isEqualTo(PAIMON);
         }
     }
+
+    @Test
+    void testPreventInvalidSnapshotInterval() throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, 
Duration.ofMinutes(10));
+
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, configuration, true);
+        dynamicConfigManager.startup();
+
+        // Try to set snapshot interval to an invalid value - should be 
rejected by type validation
+        assertThatThrownBy(
+                        () ->
+                                dynamicConfigManager.alterConfigs(
+                                        Collections.singletonList(
+                                                new AlterConfig(
+                                                        
ConfigOptions.KV_SNAPSHOT_INTERVAL.key(),
+                                                        "invalid_value",
+                                                        
AlterConfigOpType.SET))))
+                .isInstanceOf(ConfigException.class)
+                .hasMessageContaining(
+                        "Cannot parse 'invalid_value' as Duration for config 
'kv.snapshot.interval'");
+    }
+
+    @Test
+    void testDynamicSnapshotIntervalChange() throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, 
Duration.ofMinutes(10));
+
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, configuration, true);
+
+        AtomicReference<Duration> reconfiguredInterval = new 
AtomicReference<>();
+        dynamicConfigManager.register(
+                new ServerReconfigurable() {
+                    @Override
+                    public void validate(Configuration newConfig) throws 
ConfigException {}
+
+                    @Override
+                    public void reconfigure(Configuration newConfig) {
+                        
reconfiguredInterval.set(newConfig.get(ConfigOptions.KV_SNAPSHOT_INTERVAL));
+                    }
+                });
+        dynamicConfigManager.startup();
+
+        // Change snapshot interval to 5 minutes - should succeed
+        assertThatCode(
+                        () ->
+                                dynamicConfigManager.alterConfigs(
+                                        Collections.singletonList(
+                                                new AlterConfig(
+                                                        
ConfigOptions.KV_SNAPSHOT_INTERVAL.key(),
+                                                        "5min",
+                                                        
AlterConfigOpType.SET))))
+                .doesNotThrowAnyException();
+
+        // Verify config was persisted to ZK
+        Map<String, String> zkConfig = zookeeperClient.fetchEntityConfig();
+        
assertThat(zkConfig.get(ConfigOptions.KV_SNAPSHOT_INTERVAL.key())).isEqualTo("5min");
+
+        // Verify the reconfigurable was notified with the new value
+        
assertThat(reconfiguredInterval.get()).isEqualTo(Duration.ofMinutes(5));
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 714835ce5..357d7a08c 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -26,7 +26,6 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.server.SequenceIDCounter;
 import org.apache.fluss.server.kv.rocksdb.RocksDBExtension;
 import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
-import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import org.apache.fluss.server.testutils.KvTestUtils;
 import org.apache.fluss.server.utils.ResourceGuard;
 import org.apache.fluss.server.zk.CuratorFrameworkWithUnhandledErrorListener;
@@ -455,8 +454,7 @@ class KvTabletSnapshotTargetTest {
                 target,
                 periodicMaterializeDelay,
                 java.util.concurrent.Executors.newFixedThreadPool(1),
-                scheduledExecutorService,
-                TestingMetricGroups.BUCKET_METRICS);
+                scheduledExecutorService);
     }
 
     private KvTabletSnapshotTarget createSnapshotTarget(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
index c719c9683..018d980fb 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
@@ -20,7 +20,6 @@ package org.apache.fluss.server.kv.snapshot;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import 
org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService;
 
 import org.junit.jupiter.api.AfterEach;
@@ -36,6 +35,7 @@ import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.fluss.shaded.guava32.com.google.common.collect.Iterators.getOnlyElement;
@@ -127,6 +127,43 @@ class PeriodicSnapshotManagerTest {
                 .hasMessage(exceptionMessage);
     }
 
+    @Test
+    void testDynamicSnapshotInterval() {
+        long initialInterval = 10_000L;
+        long updatedInterval = 5_000L;
+        AtomicLong currentInterval = new AtomicLong(initialInterval);
+
+        periodicSnapshotManager =
+                new PeriodicSnapshotManager(
+                        tableBucket,
+                        NopSnapshotTarget.INSTANCE,
+                        currentInterval::get,
+                        asyncSnapshotExecutorService,
+                        scheduledExecutorService);
+        periodicSnapshotManager.start();
+
+        // Trigger the initial task; NopTarget returns empty, so 
scheduleNextSnapshot() is
+        // called with the current supplier value (initialInterval = 10000ms).
+        scheduledExecutorService.triggerNonPeriodicScheduledTasks();
+        assertThat(
+                        
getOnlyElement(scheduledExecutorService.getAllScheduledTasks().iterator())
+                                .getDelay(MILLISECONDS))
+                .as("next snapshot should be scheduled with the initial 
interval")
+                .isGreaterThan(updatedInterval)
+                .isLessThanOrEqualTo(initialInterval);
+
+        // Update snapshot interval to 5000ms via the supplier.
+        currentInterval.set(updatedInterval);
+
+        // Trigger again; scheduleNextSnapshot() should now use 
updatedInterval (5000ms).
+        scheduledExecutorService.triggerNonPeriodicScheduledTasks();
+        assertThat(
+                        
getOnlyElement(scheduledExecutorService.getAllScheduledTasks().iterator())
+                                .getDelay(MILLISECONDS))
+                .as("next snapshot should be scheduled with the updated 
interval")
+                .isLessThanOrEqualTo(updatedInterval);
+    }
+
     private void checkOnlyOneScheduledTasks() {
         assertThat(
                         
getOnlyElement(scheduledExecutorService.getAllScheduledTasks().iterator())
@@ -150,8 +187,7 @@ class PeriodicSnapshotManagerTest {
                 target,
                 periodicMaterializeDelay,
                 asyncSnapshotExecutorService,
-                scheduledExecutorService,
-                TestingMetricGroups.BUCKET_METRICS);
+                scheduledExecutorService);
     }
 
     private static class NopSnapshotTarget implements 
PeriodicSnapshotManager.SnapshotTarget {

Reply via email to