This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e6e95cabc [kv] Support dynamic configuration for
'kv.snapshot.interval' (#2820)
e6e95cabc is described below
commit e6e95cabc3c6bb7ee3fa8dc91ccbc5e7bf611366
Author: yunhong <[email protected]>
AuthorDate: Wed Mar 11 10:16:29 2026 +0800
[kv] Support dynamic configuration for 'kv.snapshot.interval' (#2820)
* [kv] Support dynamic configuration for 'kv.snapshot.interval'
* address yubing's comments
---
.../apache/fluss/server/DynamicServerConfig.java | 5 +-
.../server/kv/snapshot/DefaultSnapshotContext.java | 38 ++++++++++++-
.../kv/snapshot/PeriodicSnapshotManager.java | 53 +++++++++++-------
.../org/apache/fluss/server/replica/Replica.java | 3 +-
.../fluss/server/replica/ReplicaManager.java | 7 ++-
.../apache/fluss/server/tablet/TabletServer.java | 7 ++-
.../fluss/server/DynamicConfigChangeTest.java | 65 ++++++++++++++++++++++
.../kv/snapshot/KvTabletSnapshotTargetTest.java | 4 +-
.../kv/snapshot/PeriodicSnapshotManagerTest.java | 42 +++++++++++++-
9 files changed, 190 insertions(+), 34 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
index bb19aa331..f3ae90496 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
@@ -43,6 +43,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
import static
org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
+import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL;
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
@@ -59,7 +60,9 @@ class DynamicServerConfig {
private static final Set<String> ALLOWED_CONFIG_KEYS =
new HashSet<>(
Arrays.asList(
- DATALAKE_FORMAT.key(),
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()));
+ DATALAKE_FORMAT.key(),
+ KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+ KV_SNAPSHOT_INTERVAL.key()));
private static final Set<String> ALLOWED_CONFIG_PREFIXES =
Collections.singleton("datalake.");
private final ReadWriteLock lock = new ReentrantReadWriteLock();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
index a6894851b..9bec8977c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
@@ -19,6 +19,8 @@ package org.apache.fluss.server.kv.snapshot;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.cluster.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.server.kv.KvSnapshotResource;
@@ -26,12 +28,17 @@ import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.function.FunctionWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/** A default implementation for {@link SnapshotContext}. */
-public class DefaultSnapshotContext implements SnapshotContext {
+public class DefaultSnapshotContext implements SnapshotContext,
ServerReconfigurable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultSnapshotContext.class);
private final ZooKeeperClient zooKeeperClient;
private final CompletedKvSnapshotCommitter completedKvSnapshotCommitter;
@@ -41,7 +48,7 @@ public class DefaultSnapshotContext implements
SnapshotContext {
private final KvSnapshotDataUploader kvSnapshotDataUploader;
private final KvSnapshotDataDownloader kvSnapshotDataDownloader;
- private final long kvSnapshotIntervalMs;
+ private volatile long kvSnapshotIntervalMs;
/** The write buffer size for writing the kv snapshot file to remote
filesystem. */
private final int writeBufferSizeInBytes;
@@ -161,4 +168,31 @@ public class DefaultSnapshotContext implements
SnapshotContext {
completedSnapshotHandleStore.remove(snapshot.getTableBucket(),
snapshot.getSnapshotID());
snapshot.discardAsync(asyncOperationsThreadPool);
}
+
+ // ============ ServerReconfigurable Implementation ============
+
+ @Override
+ public void validate(Configuration newConfig) throws ConfigException {
+ // Type validation is already handled by DynamicServerConfig.
+ // Here we only do basic sanity checks.
+ long newIntervalMs =
newConfig.get(ConfigOptions.KV_SNAPSHOT_INTERVAL).toMillis();
+ if (newIntervalMs <= 0) {
+ throw new ConfigException(
+ String.format(
+ "Invalid kv.snapshot.interval can not be negative
or zero: %d ms",
+ newIntervalMs));
+ }
+ }
+
+ @Override
+ public void reconfigure(Configuration newConfig) {
+ long newIntervalMs =
newConfig.get(ConfigOptions.KV_SNAPSHOT_INTERVAL).toMillis();
+ if (newIntervalMs == kvSnapshotIntervalMs) {
+ LOG.debug("kv.snapshot.interval unchanged: {} ms", newIntervalMs);
+ return;
+ }
+ long oldIntervalMs = kvSnapshotIntervalMs;
+ kvSnapshotIntervalMs = newIntervalMs;
+ LOG.info("kv.snapshot.interval reconfigured: {} ms -> {} ms",
oldIntervalMs, newIntervalMs);
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index 9ba9ecbd6..566d6ab10 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -21,7 +21,6 @@ import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.fs.FileSystemSafetyNet;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.server.metrics.group.BucketMetricGroup;
import org.apache.fluss.utils.MathUtils;
import org.apache.fluss.utils.concurrent.Executors;
import org.apache.fluss.utils.concurrent.FutureUtils;
@@ -41,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongSupplier;
/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -69,7 +69,11 @@ public class PeriodicSnapshotManager implements Closeable {
/** Async thread pool, to complete async phase of snapshot. */
private final ExecutorService asyncOperationsThreadPool;
- private final long periodicSnapshotDelay;
+ /**
+ * A supplier to get the current snapshot delay. This allows dynamic
reconfiguration of the
+ * snapshot interval at runtime.
+ */
+ private final LongSupplier snapshotIntervalSupplier;
/** Number of consecutive snapshot failures. */
private final AtomicInteger numberOfConsecutiveFailures;
@@ -98,35 +102,48 @@ public class PeriodicSnapshotManager implements Closeable {
SnapshotTarget target,
long periodicSnapshotDelay,
ExecutorService asyncOperationsThreadPool,
- ScheduledExecutorService periodicExecutor,
- BucketMetricGroup bucketMetricGroup) {
+ ScheduledExecutorService periodicExecutor) {
this(
tableBucket,
target,
- periodicSnapshotDelay,
+ () -> periodicSnapshotDelay,
asyncOperationsThreadPool,
periodicExecutor,
- Executors.directExecutor(),
- bucketMetricGroup);
+ Executors.directExecutor());
}
@VisibleForTesting
protected PeriodicSnapshotManager(
TableBucket tableBucket,
SnapshotTarget target,
- long periodicSnapshotDelay,
+ LongSupplier snapshotIntervalSupplier,
+ ExecutorService asyncOperationsThreadPool,
+ ScheduledExecutorService periodicExecutor) {
+ this(
+ tableBucket,
+ target,
+ snapshotIntervalSupplier,
+ asyncOperationsThreadPool,
+ periodicExecutor,
+ Executors.directExecutor());
+ }
+
+ private PeriodicSnapshotManager(
+ TableBucket tableBucket,
+ SnapshotTarget target,
+ LongSupplier snapshotIntervalSupplier,
ExecutorService asyncOperationsThreadPool,
ScheduledExecutorService periodicExecutor,
- Executor guardedExecutor,
- BucketMetricGroup bucketMetricGroup) {
+ Executor guardedExecutor) {
this.tableBucket = tableBucket;
this.target = target;
- this.periodicSnapshotDelay = periodicSnapshotDelay;
+ this.snapshotIntervalSupplier = snapshotIntervalSupplier;
this.numberOfConsecutiveFailures = new AtomicInteger(0);
this.periodicExecutor = periodicExecutor;
this.guardedExecutor = guardedExecutor;
this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+ long periodicSnapshotDelay = snapshotIntervalSupplier.getAsLong();
this.initialDelay =
periodicSnapshotDelay > 0
? MathUtils.murmurHash(tableBucket.hashCode()) %
periodicSnapshotDelay
@@ -137,21 +154,19 @@ public class PeriodicSnapshotManager implements Closeable
{
TableBucket tableBucket,
SnapshotTarget target,
SnapshotContext snapshotContext,
- Executor guardedExecutor,
- BucketMetricGroup bucketMetricGroup) {
+ Executor guardedExecutor) {
return new PeriodicSnapshotManager(
tableBucket,
target,
- snapshotContext.getSnapshotIntervalMs(),
+ snapshotContext::getSnapshotIntervalMs,
snapshotContext.getAsyncOperationsThreadPool(),
snapshotContext.getSnapshotScheduler(),
- guardedExecutor,
- bucketMetricGroup);
+ guardedExecutor);
}
public void start() {
// disable periodic snapshot when periodicMaterializeDelay is not
positive
- if (!started && periodicSnapshotDelay > 0) {
+ if (!started && initialDelay > 0) {
started = true;
@@ -216,7 +231,7 @@ public class PeriodicSnapshotManager implements Closeable {
"TableBucket {} has no data updates since
last snapshot, "
+ "skip this one and schedule the
next one in {} seconds",
tableBucket,
- periodicSnapshotDelay / 1000);
+ snapshotIntervalSupplier.getAsLong() /
1000);
}
}
});
@@ -324,7 +339,7 @@ public class PeriodicSnapshotManager implements Closeable {
}
private void scheduleNextSnapshot() {
- scheduleNextSnapshot(periodicSnapshotDelay);
+ scheduleNextSnapshot(snapshotIntervalSupplier.getAsLong());
}
/** {@link SnapshotRunnable} provider and consumer. */
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 10678eb5c..f73bcdda2 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -931,8 +931,7 @@ public final class Replica {
tableBucket,
kvTabletSnapshotTarget,
snapshotContext,
- kvTablet.getGuardedExecutor(),
- bucketMetricGroup);
+ kvTablet.getGuardedExecutor());
kvSnapshotManager.start();
closeableRegistryForKv.registerCloseable(kvSnapshotManager);
} catch (Exception e) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index c891e5d3d..3fc024209 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -79,7 +79,6 @@ import org.apache.fluss.server.kv.KvManager;
import org.apache.fluss.server.kv.KvSnapshotResource;
import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter;
import org.apache.fluss.server.kv.snapshot.DefaultSnapshotContext;
-import org.apache.fluss.server.kv.snapshot.SnapshotContext;
import org.apache.fluss.server.log.FetchDataInfo;
import org.apache.fluss.server.log.FetchParams;
import org.apache.fluss.server.log.ListOffsetsParam;
@@ -193,7 +192,7 @@ public class ReplicaManager {
// for kv snapshot
private final KvSnapshotResource kvSnapshotResource;
- private final SnapshotContext kvSnapshotContext;
+ private final DefaultSnapshotContext kvSnapshotContext;
// remote log manager for remote log storage.
private final RemoteLogManager remoteLogManager;
@@ -323,6 +322,10 @@ public class ReplicaManager {
return remoteLogManager;
}
+ public DefaultSnapshotContext getKvSnapshotContext() {
+ return kvSnapshotContext;
+ }
+
private void registerMetrics() {
serverMetricGroup.gauge(
MetricNames.REPLICA_LEADER_COUNT,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index d108a6337..93e8951b7 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -231,8 +231,6 @@ public class TabletServer extends ServerBase {
// Register kvManager to dynamicConfigManager for dynamic
reconfiguration
dynamicConfigManager.register(kvManager);
- // Start dynamicConfigManager after all reconfigurable components
are registered
- dynamicConfigManager.startup();
this.authorizer = AuthorizerLoader.createAuthorizer(conf,
zkClient, pluginManager);
if (authorizer != null) {
@@ -275,6 +273,11 @@ public class TabletServer extends ServerBase {
ioExecutor);
replicaManager.startup();
+ // Register DefaultSnapshotContext for dynamic kv.snapshot.interval
+
dynamicConfigManager.register(replicaManager.getKvSnapshotContext());
+ // Start dynamicConfigManager after all reconfigurable components
are registered
+ dynamicConfigManager.startup();
+
this.tabletService =
new TabletService(
serverId,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
index 2ba44ac41..aa55372f7 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
@@ -21,6 +21,7 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.AlterConfigOpType;
+import org.apache.fluss.config.cluster.ServerReconfigurable;
import org.apache.fluss.exception.ConfigException;
import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader;
import org.apache.fluss.server.zk.NOPErrorHandler;
@@ -41,6 +42,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
@@ -347,4 +349,67 @@ public class DynamicConfigChangeTest {
.isEqualTo(PAIMON);
}
}
+
+ @Test
+ void testPreventInvalidSnapshotInterval() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.set(ConfigOptions.KV_SNAPSHOT_INTERVAL,
Duration.ofMinutes(10));
+
+ DynamicConfigManager dynamicConfigManager =
+ new DynamicConfigManager(zookeeperClient, configuration, true);
+ dynamicConfigManager.startup();
+
+ // Try to set snapshot interval to an invalid value - should be
rejected by type validation
+ assertThatThrownBy(
+ () ->
+ dynamicConfigManager.alterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+
ConfigOptions.KV_SNAPSHOT_INTERVAL.key(),
+ "invalid_value",
+
AlterConfigOpType.SET))))
+ .isInstanceOf(ConfigException.class)
+ .hasMessageContaining(
+ "Cannot parse 'invalid_value' as Duration for config
'kv.snapshot.interval'");
+ }
+
+ @Test
+ void testDynamicSnapshotIntervalChange() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.set(ConfigOptions.KV_SNAPSHOT_INTERVAL,
Duration.ofMinutes(10));
+
+ DynamicConfigManager dynamicConfigManager =
+ new DynamicConfigManager(zookeeperClient, configuration, true);
+
+ AtomicReference<Duration> reconfiguredInterval = new
AtomicReference<>();
+ dynamicConfigManager.register(
+ new ServerReconfigurable() {
+ @Override
+ public void validate(Configuration newConfig) throws
ConfigException {}
+
+ @Override
+ public void reconfigure(Configuration newConfig) {
+
reconfiguredInterval.set(newConfig.get(ConfigOptions.KV_SNAPSHOT_INTERVAL));
+ }
+ });
+ dynamicConfigManager.startup();
+
+ // Change snapshot interval to 5 minutes - should succeed
+ assertThatCode(
+ () ->
+ dynamicConfigManager.alterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+
ConfigOptions.KV_SNAPSHOT_INTERVAL.key(),
+ "5min",
+
AlterConfigOpType.SET))))
+ .doesNotThrowAnyException();
+
+ // Verify config was persisted to ZK
+ Map<String, String> zkConfig = zookeeperClient.fetchEntityConfig();
+
assertThat(zkConfig.get(ConfigOptions.KV_SNAPSHOT_INTERVAL.key())).isEqualTo("5min");
+
+ // Verify the reconfigurable was notified with the new value
+
assertThat(reconfiguredInterval.get()).isEqualTo(Duration.ofMinutes(5));
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
index 714835ce5..357d7a08c 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java
@@ -26,7 +26,6 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.server.SequenceIDCounter;
import org.apache.fluss.server.kv.rocksdb.RocksDBExtension;
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
-import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.apache.fluss.server.testutils.KvTestUtils;
import org.apache.fluss.server.utils.ResourceGuard;
import org.apache.fluss.server.zk.CuratorFrameworkWithUnhandledErrorListener;
@@ -455,8 +454,7 @@ class KvTabletSnapshotTargetTest {
target,
periodicMaterializeDelay,
java.util.concurrent.Executors.newFixedThreadPool(1),
- scheduledExecutorService,
- TestingMetricGroups.BUCKET_METRICS);
+ scheduledExecutorService);
}
private KvTabletSnapshotTarget createSnapshotTarget(
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
index c719c9683..018d980fb 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManagerTest.java
@@ -20,7 +20,6 @@ package org.apache.fluss.server.kv.snapshot;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import
org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService;
import org.junit.jupiter.api.AfterEach;
@@ -36,6 +35,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicLong;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.fluss.shaded.guava32.com.google.common.collect.Iterators.getOnlyElement;
@@ -127,6 +127,43 @@ class PeriodicSnapshotManagerTest {
.hasMessage(exceptionMessage);
}
+ @Test
+ void testDynamicSnapshotInterval() {
+ long initialInterval = 10_000L;
+ long updatedInterval = 5_000L;
+ AtomicLong currentInterval = new AtomicLong(initialInterval);
+
+ periodicSnapshotManager =
+ new PeriodicSnapshotManager(
+ tableBucket,
+ NopSnapshotTarget.INSTANCE,
+ currentInterval::get,
+ asyncSnapshotExecutorService,
+ scheduledExecutorService);
+ periodicSnapshotManager.start();
+
+ // Trigger the initial task; NopTarget returns empty, so
scheduleNextSnapshot() is
+ // called with the current supplier value (initialInterval = 10000ms).
+ scheduledExecutorService.triggerNonPeriodicScheduledTasks();
+ assertThat(
+
getOnlyElement(scheduledExecutorService.getAllScheduledTasks().iterator())
+ .getDelay(MILLISECONDS))
+ .as("next snapshot should be scheduled with the initial
interval")
+ .isGreaterThan(updatedInterval)
+ .isLessThanOrEqualTo(initialInterval);
+
+ // Update snapshot interval to 5000ms via the supplier.
+ currentInterval.set(updatedInterval);
+
+ // Trigger again; scheduleNextSnapshot() should now use
updatedInterval (5000ms).
+ scheduledExecutorService.triggerNonPeriodicScheduledTasks();
+ assertThat(
+
getOnlyElement(scheduledExecutorService.getAllScheduledTasks().iterator())
+ .getDelay(MILLISECONDS))
+ .as("next snapshot should be scheduled with the updated
interval")
+ .isLessThanOrEqualTo(updatedInterval);
+ }
+
private void checkOnlyOneScheduledTasks() {
assertThat(
getOnlyElement(scheduledExecutorService.getAllScheduledTasks().iterator())
@@ -150,8 +187,7 @@ class PeriodicSnapshotManagerTest {
target,
periodicMaterializeDelay,
asyncSnapshotExecutorService,
- scheduledExecutorService,
- TestingMetricGroups.BUCKET_METRICS);
+ scheduledExecutorService);
}
private static class NopSnapshotTarget implements
PeriodicSnapshotManager.SnapshotTarget {