This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a0d00297153 Allow dynamic toggling of adaptive routing metrics export 
(#18286)
a0d00297153 is described below

commit a0d00297153b58d0fe79f1146bef51668abcbc3c
Author: Timothy Elgersma <[email protected]>
AuthorDate: Thu Jun 18 15:38:37 2026 -0400

    Allow dynamic toggling of adaptive routing metrics export (#18286)
    
    * Allow dynamic toggling of adaptive routing metrics export (#582)
    
    Committed-By-Agent: claude
    
    cc stripe-private-oss-forks/pinot-reviewers
    r?
    
    https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/581 added 
metrics for adaptive routing stats.
    
    We may want to toggle whether or not stats are exported as metrics without 
needing to launch a rolling restart.
    
    ServerRoutingStatsManager now implements PinotClusterConfigChangeListener. 
The periodic export task is always scheduled when stats collection is enabled; 
the enable.stats.metric.export flag is checked inside the task and can be 
updated at runtime via the Helix cluster config (PUT /cluster/configs) without 
a broker restart.
    
    We also allow updating the metrics export frequency at runtime.
    
    
[STREAMANALYTICS-4390](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4390)
    
    Deployed to [rad-canary 
QA](https://amp.qa.corp.stripe.com/deploy/qa-deploy1.pdx.deploy.stripe.net%2Fdeploy_r2WUckQcQ467TfeJvyp4zw).
    
    `ssh`ed onto a rad-canary controller and ran
    ```
    curl -X POST localhost:9000/cluster/configs -H "Content-Type: 
application/json" -d 
'{"pinot.broker.adaptive.server.selector.enable.stats.metric.export": "false"}'
    curl -X POST localhost:9000/cluster/configs -H "Content-Type: 
application/json" -d 
'{"pinot.broker.adaptive.server.selector.enable.stats.metric.export": "true"}'
    curl -X POST localhost:9000/cluster/configs -H "Content-Type: 
application/json" -d 
'{"pinot.broker.adaptive.server.selector.enable.stats.metric.export": "false"}'
    ```
    I see that the metrics stopped / resumed as expected in 
[grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/explore?schemaVersion=1&panes=%7B%223yy%22:%7B%22datasource%22:%22zb219lV4k%22,%22queries%22:%5B%7B%22refId%22:%22B%22,%22expr%22:%22count%20by%20%28host%29%20%28%7B__name__%3D~%5C%22pinot_broker_adaptive_server_latency_ema%5C%22,%20pinot_cluster%3D%5C%22rad-canary%5C%22%7D%29%22,%22range%22:true,%22instant%22:true,%22datasource%22:%7B%22type%22:%22prometheu
 [...]
    
    <img width="1477" alt="Screenshot 2026-04-02 at 4 17 58 pm" 
src="https://git.corp.stripe.com/user-attachments/assets/dca06b6d-8b6f-4797-a079-95a1f21d7de7";
 />
    
    Stripe-Original-Repo: stripe-private-oss-forks/pinot
    Stripe-Monotonic-Timestamp: v2/2026-04-07T21:12:57Z/0
    Stripe-Original-PR: 
https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/582
    
    * mark as volatile
    
    * add validation for runtime setting
---
 .../broker/broker/helix/BaseBrokerStarter.java     |   1 +
 .../routing/stats/ServerRoutingStatsManager.java   | 100 ++++++++++++++--
 .../stats/ServerRoutingStatsManagerTest.java       | 132 +++++++++++++++++++++
 3 files changed, 222 insertions(+), 11 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index eb6428cb877..622479edf34 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -697,6 +697,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
         System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
 
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+    
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_serverRoutingStatsManager);
 
     NettyInspector.registerMetrics(_brokerMetrics);
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index fe85d62f761..8fbf078d5d7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -19,14 +19,17 @@
 
 package org.apache.pinot.core.transport.server.routing.stats;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
@@ -34,6 +37,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.query.QueryExecutionContext.QueryType;
 import org.apache.pinot.spi.query.QueryThreadContext;
@@ -43,12 +47,18 @@ import org.slf4j.LoggerFactory;
 
 
 /**
+ * {@code ServerRoutingStatsManager} manages the query routing stats for each 
server and used by the Adaptive
+ * Server Selection feature (when enabled). The stats are maintained at the 
broker and are updated when a query is
+ * submitted to a server and when a server responds after processing a query.
  *
- *  {@code ServerRoutingStatsManager} manages the query routing stats for each 
server and used by the Adaptive
- *  Server Selection feature (when enabled). The stats are maintained at the 
broker and are updated when a query is
- *  submitted to a server and when a server responds after processing a query.
+ * <p>Thread safety: {@code onChange} is invoked on the Helix config-change 
callback thread.
+ * {@code exportStatsAsMetrics} runs on the single-threaded {@code 
_periodicTaskExecutor}.
+ * {@code _enableStatsMetricExport} and {@code _statsMetricExportIntervalMs} 
are {@code volatile} so
+ * writes from the Helix callback thread are immediately visible to the export 
thread. All other config
+ * fields ({@code _alpha}, {@code _autoDecayWindowMs}, etc.) are written once 
during {@code init()}
+ * before any executor threads start, so no additional synchronization is 
needed for those.
  */
-public class ServerRoutingStatsManager {
+public class ServerRoutingStatsManager implements 
PinotClusterConfigChangeListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerRoutingStatsManager.class);
 
   private final PinotConfiguration _config;
@@ -69,7 +79,9 @@ public class ServerRoutingStatsManager {
   private double _avgInitializationVal;
   private int _hybridScoreExponent;
   private int _hybridScoreQueueFloor;
-  private boolean _enableStatsMetricExport;
+  private volatile boolean _enableStatsMetricExport;
+  private volatile long _statsMetricExportIntervalMs;
+  private volatile ScheduledFuture<?> _metricExportFuture;
 
   public ServerRoutingStatsManager(PinotConfiguration pinotConfig, 
BrokerMetrics brokerMetrics) {
     _config = pinotConfig;
@@ -112,12 +124,57 @@ public class ServerRoutingStatsManager {
 
     _enableStatsMetricExport = 
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
         AdaptiveServerSelector.DEFAULT_ENABLE_STATS_METRIC_EXPORT);
-    if (_enableStatsMetricExport) {
-      long intervalMs = 
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
-          AdaptiveServerSelector.DEFAULT_STATS_METRIC_EXPORT_INTERVAL_MS);
-      _periodicTaskExecutor.scheduleAtFixedRate(this::exportStatsAsMetrics, 
intervalMs, intervalMs,
-          TimeUnit.MILLISECONDS);
-      LOGGER.info("Adaptive server routing stats metric export enabled with 
interval {}ms.", intervalMs);
+    _statsMetricExportIntervalMs = 
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
+        AdaptiveServerSelector.DEFAULT_STATS_METRIC_EXPORT_INTERVAL_MS);
+    _metricExportFuture = 
_periodicTaskExecutor.scheduleAtFixedRate(this::exportStatsAsMetrics,
+        _statsMetricExportIntervalMs, _statsMetricExportIntervalMs, 
TimeUnit.MILLISECONDS);
+    LOGGER.info("Adaptive server routing stats metric export scheduled with 
interval {}ms (enabled={}).",
+        _statsMetricExportIntervalMs, _enableStatsMetricExport);
+  }
+
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    if 
(changedConfigs.contains(AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT))
 {
+      String value = 
clusterConfigs.get(AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT);
+      if (value != null) {
+        _enableStatsMetricExport = Boolean.parseBoolean(value);
+      } else {
+        // Key was removed from cluster config — fall back to the static 
broker config value.
+        _enableStatsMetricExport = 
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
+            AdaptiveServerSelector.DEFAULT_ENABLE_STATS_METRIC_EXPORT);
+      }
+      LOGGER.info("Updated enableStatsMetricExport to {} from cluster 
config.", _enableStatsMetricExport);
+      if (!_enableStatsMetricExport) {
+        removeAllServerStatsGauges();
+      }
+    }
+    if 
(changedConfigs.contains(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS))
 {
+      String value = 
clusterConfigs.get(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS);
+      long newIntervalMs;
+      if (value != null) {
+        try {
+          newIntervalMs = Long.parseLong(value);
+        } catch (NumberFormatException e) {
+          LOGGER.warn("Invalid value '{}' for config '{}'; ignoring interval 
change", value,
+              
AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS);
+          return;
+        }
+        if (newIntervalMs <= 0) {
+          LOGGER.warn("Non-positive value {} for config '{}'; ignoring 
interval change", newIntervalMs,
+              
AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS);
+          return;
+        }
+      } else {
+        newIntervalMs = 
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
+            AdaptiveServerSelector.DEFAULT_STATS_METRIC_EXPORT_INTERVAL_MS);
+      }
+      if (newIntervalMs != _statsMetricExportIntervalMs) {
+        _statsMetricExportIntervalMs = newIntervalMs;
+        _metricExportFuture.cancel(false);
+        _metricExportFuture = 
_periodicTaskExecutor.scheduleAtFixedRate(this::exportStatsAsMetrics,
+            newIntervalMs, newIntervalMs, TimeUnit.MILLISECONDS);
+        LOGGER.info("Rescheduled adaptive server routing stats metric export 
with new interval {}ms.", newIntervalMs);
+      }
     }
   }
 
@@ -125,6 +182,11 @@ public class ServerRoutingStatsManager {
     return _isEnabled;
   }
 
+  @VisibleForTesting
+  public long getStatsMetricExportIntervalMs() {
+    return _statsMetricExportIntervalMs;
+  }
+
   public void shutDown() {
     // As the stats are not persistent, shutdown need not wait for task 
termination.
     if (!_isEnabled) {
@@ -441,12 +503,28 @@ public class ServerRoutingStatsManager {
     }
   }
 
+  private void removeAllServerStatsGauges() {
+    if (_serverQueryStatsMap == null) {
+      return;
+    }
+    for (String serverInstanceId : _serverQueryStatsMap.keySet()) {
+      String serverTag = "server." + serverInstanceId;
+      
_brokerMetrics.removeGauge(BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS.getGaugeName()
 + "." + serverTag);
+      
_brokerMetrics.removeGauge(BrokerGauge.ADAPTIVE_SERVER_LATENCY_EMA.getGaugeName()
 + "." + serverTag);
+      
_brokerMetrics.removeGauge(BrokerGauge.ADAPTIVE_SERVER_HYBRID_SCORE.getGaugeName()
 + "." + serverTag);
+    }
+    LOGGER.info("Removed adaptive server routing stats gauges for {} 
servers.", _serverQueryStatsMap.size());
+  }
+
   private void recordQueueSizeMetrics() {
     int queueSize = getQueueSize();
     
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ROUTING_STATS_MANAGER_QUEUE_SIZE,
 queueSize);
   }
 
   private void exportStatsAsMetrics() {
+    if (!_enableStatsMetricExport) {
+      return;
+    }
     try {
       exportStatsForMap(_serverQueryStatsMap, "server.",
           BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index 17b536091be..79405be3341 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -456,6 +457,137 @@ public class ServerRoutingStatsManagerTest {
     assertTrue(fastScore < slowScore, "Idle servers should be ranked by 
latency");
   }
 
+  @Test
+  public void testStatsMetricExportDynamicToggle() throws InterruptedException 
{
+    Map<String, Object> properties = new HashMap<>();
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_EWMA_ALPHA,
 1.0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AUTODECAY_WINDOW_MS,
 -1);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
 0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
 0.0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
 3);
+    // Start with metric export disabled.
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
 false);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
 50L);
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties),
+        _brokerMetrics);
+    manager.init();
+
+    int requestId = 0;
+    manager.recordStatsForQuerySubmission(requestId++, "dynamicToggleServer");
+    waitForStatsUpdate(manager, requestId);
+    manager.recordStatsUponResponseArrival(requestId++, "dynamicToggleServer", 
100);
+    waitForStatsUpdate(manager, requestId);
+
+    String numInFlightKey = 
BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS.getGaugeName() + "."
+        + "server.dynamicToggleServer";
+
+    // Confirm no metrics while disabled.
+    Thread.sleep(200);
+    assertNull(_brokerMetrics.getGaugeValue(numInFlightKey));
+
+    // Enable via cluster config change and verify metrics are now exported.
+    manager.onChange(
+        
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT),
+        
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
 "true"));
+
+    TestUtils.waitForCondition(aVoid -> 
_brokerMetrics.getGaugeValue(numInFlightKey) != null,
+        50L, 5000, "Timed out waiting for metrics after dynamic enable");
+
+    // Disable again and verify the gauges are removed from the registry.
+    manager.onChange(
+        
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT),
+        
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
 "false"));
+
+    assertNull(_brokerMetrics.getGaugeValue(numInFlightKey));
+  }
+
+  @Test
+  public void testStatsMetricExportIntervalDynamicUpdate() throws 
InterruptedException {
+    Map<String, Object> properties = new HashMap<>();
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_EWMA_ALPHA,
 1.0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AUTODECAY_WINDOW_MS,
 -1);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
 0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
 0.0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
 3);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
 true);
+    // Start with a very long interval so the task won't fire during the test.
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
 100000L);
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties),
+        _brokerMetrics);
+    manager.init();
+
+    int requestId = 0;
+    manager.recordStatsForQuerySubmission(requestId++, "intervalUpdateServer");
+    waitForStatsUpdate(manager, requestId);
+    manager.recordStatsUponResponseArrival(requestId++, 
"intervalUpdateServer", 100);
+    waitForStatsUpdate(manager, requestId);
+
+    String numInFlightKey = 
BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS.getGaugeName()
+        + ".server.intervalUpdateServer";
+
+    // No export yet — interval is too long.
+    Thread.sleep(200);
+    assertNull(_brokerMetrics.getGaugeValue(numInFlightKey));
+
+    // Shorten the interval via cluster config change and verify metrics are 
now exported.
+    manager.onChange(
+        
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+        
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
 "50"));
+
+    TestUtils.waitForCondition(aVoid -> 
_brokerMetrics.getGaugeValue(numInFlightKey) != null,
+        50L, 5000, "Timed out waiting for metrics after interval update");
+  }
+
+  @Test
+  public void testStatsMetricExportIntervalDynamicUpdateIgnoresBadValues() {
+    Map<String, Object> properties = new HashMap<>();
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_EWMA_ALPHA,
 1.0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AUTODECAY_WINDOW_MS,
 -1);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
 0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
 0.0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
 3);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
 true);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
 100000L);
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new 
PinotConfiguration(properties),
+        _brokerMetrics);
+    manager.init();
+
+    long intervalBefore = manager.getStatsMetricExportIntervalMs();
+
+    // Non-numeric value: must be silently ignored without throwing.
+    manager.onChange(
+        
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+        
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
 "abc"));
+    assertEquals(manager.getStatsMetricExportIntervalMs(), intervalBefore,
+        "Interval must not change on non-numeric config value");
+
+    // Zero: must be silently ignored.
+    manager.onChange(
+        
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+        
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
 "0"));
+    assertEquals(manager.getStatsMetricExportIntervalMs(), intervalBefore,
+        "Interval must not change on zero config value");
+
+    // Negative value: must be silently ignored.
+    manager.onChange(
+        
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+        
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
 "-1"));
+    assertEquals(manager.getStatsMetricExportIntervalMs(), intervalBefore,
+        "Interval must not change on negative config value");
+
+    // Key removed from cluster config — must fall back to the static broker 
config value (100000L).
+    manager.onChange(
+        
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+        Collections.emptyMap());
+    assertEquals(manager.getStatsMetricExportIntervalMs(), 100000L,
+        "Interval must revert to static config when cluster key is removed");
+
+    manager.shutDown();
+  }
+
   @Test
   public void testMseAndSseStatsIsolation() {
     Map<String, Object> properties = new HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to