This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cf1cdc5914 Add configurable queue size floor to adaptive routing 
hybrid score (#18288)
2cf1cdc5914 is described below

commit 2cf1cdc59149e9ddabdcd5b57818fcf736f635db
Author: Timothy Elgersma <[email protected]>
AuthorDate: Tue May 12 14:49:37 2026 -0400

    Add configurable queue size floor to adaptive routing hybrid score (#18288)
---
 .../adaptiveserverselector/HybridSelector.java     |  4 +-
 .../routing/stats/ServerRoutingStatsEntry.java     | 10 +++-
 .../routing/stats/ServerRoutingStatsManager.java   |  7 ++-
 .../stats/ServerRoutingStatsManagerTest.java       | 67 ++++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  3 +
 5 files changed, 86 insertions(+), 5 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
index 68b41b2b013..f7383ceb43b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
@@ -37,8 +37,10 @@ import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa
  * 
https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf
  *
  * The Hybrid score for each server is calculated as follows. The server with 
the lowest Hybrid score is picked.
- *       HybridScore = Math.pow(A+B, N) * C
+ *       HybridScore = Math.pow(F+A+B, N) * C
  * N -> Configurable exponent with default value of 3.
+ * F -> Configurable queue size floor with default value of 0. Setting F=1 
matches the original paper formulation and
+ *      prevents the score from collapsing to 0 when all servers are idle, 
ensuring latency is still used for routing.
  */
 public class HybridSelector implements AdaptiveServerSelector {
   private final ServerRoutingStatsManager _serverRoutingStatsManager;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
index 7aa9a99b959..970ccd1f1e0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.transport.server.routing.stats;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.pinot.common.utils.ExponentialMovingAverage;
@@ -44,8 +45,11 @@ public class ServerRoutingStatsEntry {
   // Hybrid score exponent.
   private final int _hybridScoreExponent;
 
+  // Hybrid score queue size floor (added to A+B before exponentiation to 
avoid score collapsing to 0 when idle).
+  private final int _hybridScoreQueueFloor;
+
   public ServerRoutingStatsEntry(String serverInstanceId, double alphaEMA, 
long autoDecayWindowMsEMA,
-      long warmupDurationMsEMA, double avgInitializationValEMA, int 
scoreExponent,
+      long warmupDurationMsEMA, double avgInitializationValEMA, int 
scoreExponent, int queueFloor,
       ScheduledExecutorService periodicTaskExecutor) {
     _serverInstanceId = serverInstanceId;
     _serverLock = new ReentrantReadWriteLock();
@@ -58,6 +62,8 @@ public class ServerRoutingStatsEntry {
             periodicTaskExecutor);
 
     _hybridScoreExponent = scoreExponent;
+    Preconditions.checkArgument(queueFloor >= 0, "queueFloor must be 
non-negative, got: %s", queueFloor);
+    _hybridScoreQueueFloor = queueFloor;
   }
 
   @JsonIgnore
@@ -84,7 +90,7 @@ public class ServerRoutingStatsEntry {
 
   @JsonProperty("hybridScore")
   public double computeHybridScore() {
-    double estimatedQSize = _numInFlightRequests + 
_inFlighRequestsEMA.getAverage();
+    double estimatedQSize = _hybridScoreQueueFloor + _numInFlightRequests + 
_inFlighRequestsEMA.getAverage();
     return Math.pow(estimatedQSize, _hybridScoreExponent) * 
_latencyMsEMA.getAverage();
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index 252f15c73ce..6c1bea37f14 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -64,6 +64,7 @@ public class ServerRoutingStatsManager {
   private long _warmupDurationMs;
   private double _avgInitializationVal;
   private int _hybridScoreExponent;
+  private int _hybridScoreQueueFloor;
   private boolean _enableStatsMetricExport;
 
   public ServerRoutingStatsManager(PinotConfiguration pinotConfig, 
BrokerMetrics brokerMetrics) {
@@ -91,6 +92,8 @@ public class ServerRoutingStatsManager {
         AdaptiveServerSelector.DEFAULT_AVG_INITIALIZATION_VAL);
     _hybridScoreExponent = 
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
         AdaptiveServerSelector.DEFAULT_HYBRID_SCORE_EXPONENT);
+    _hybridScoreQueueFloor = 
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR,
+        AdaptiveServerSelector.DEFAULT_HYBRID_SCORE_QUEUE_FLOOR);
 
     int threadPoolSize = 
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_STATS_MANAGER_THREADPOOL_SIZE,
         AdaptiveServerSelector.DEFAULT_STATS_MANAGER_THREADPOOL_SIZE);
@@ -167,7 +170,7 @@ public class ServerRoutingStatsManager {
   private void updateStatsAfterQuerySubmission(String serverInstanceId) {
     ServerRoutingStatsEntry stats = 
_serverQueryStatsMap.computeIfAbsent(serverInstanceId,
         k -> new ServerRoutingStatsEntry(serverInstanceId, _alpha, 
_autoDecayWindowMs, _warmupDurationMs,
-            _avgInitializationVal, _hybridScoreExponent, 
_periodicTaskExecutor));
+            _avgInitializationVal, _hybridScoreExponent, 
_hybridScoreQueueFloor, _periodicTaskExecutor));
 
     try {
       stats.getServerWriteLock().lock();
@@ -197,7 +200,7 @@ public class ServerRoutingStatsManager {
   private void updateStatsUponResponseArrival(String serverInstanceId, long 
latencyMs) {
     ServerRoutingStatsEntry stats = 
_serverQueryStatsMap.computeIfAbsent(serverInstanceId,
         k -> new ServerRoutingStatsEntry(serverInstanceId, _alpha, 
_autoDecayWindowMs, _warmupDurationMs,
-            _avgInitializationVal, _hybridScoreExponent, 
_periodicTaskExecutor));
+            _avgInitializationVal, _hybridScoreExponent, 
_hybridScoreQueueFloor, _periodicTaskExecutor));
 
     try {
       stats.getServerWriteLock().lock();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index f188604ef24..988236d557f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -388,6 +388,73 @@ public class ServerRoutingStatsManagerTest {
     assertNull(_brokerMetrics.getGaugeValue(numInFlightKey));
   }
 
+  @Test
+  public void testHybridScoreWithQueueFloor() {
+    // With floor=1 the formula is Math.pow(1+A+B, N)*C instead of 
Math.pow(A+B, N)*C.
+    // This prevents the score from collapsing to 0 when all servers are idle 
so that latency
+    // still drives routing decisions in low-traffic conditions.
+    Map<String, Object> properties = new HashMap<>();
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_EWMA_ALPHA,
 1.0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AUTODECAY_WINDOW_MS,
 -1);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
 0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
 0.0);
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
 3);
+
+    // floor=0 (default): after 1 submit + 1 response at latency=10,
+    // numInFlight=0, inFlightEMA=1, latencyEMA=10 -> (0+1)^3 * 10 = 10.
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR,
 0);
+    ServerRoutingStatsManager managerNoFloor =
+        new ServerRoutingStatsManager(new PinotConfiguration(properties), 
_brokerMetrics);
+    managerNoFloor.init();
+    int requestId = 0;
+    managerNoFloor.recordStatsForQuerySubmission(requestId++, "floorServer");
+    waitForStatsUpdate(managerNoFloor, requestId);
+    managerNoFloor.recordStatsUponResponseArrival(requestId++, "floorServer", 
10);
+    waitForStatsUpdate(managerNoFloor, requestId);
+    assertEquals(managerNoFloor.fetchHybridScoreForServer("floorServer"), 
10.0);
+
+    // floor=1: same sequence -> (1+0+1)^3 * 10 = 80.
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR,
 1);
+    ServerRoutingStatsManager managerWithFloor =
+        new ServerRoutingStatsManager(new PinotConfiguration(properties), 
_brokerMetrics);
+    managerWithFloor.init();
+    requestId = 0;
+    managerWithFloor.recordStatsForQuerySubmission(requestId++, "floorServer");
+    waitForStatsUpdate(managerWithFloor, requestId);
+    managerWithFloor.recordStatsUponResponseArrival(requestId++, 
"floorServer", 10);
+    waitForStatsUpdate(managerWithFloor, requestId);
+    assertEquals(managerWithFloor.fetchHybridScoreForServer("floorServer"), 
80.0);
+
+    // Multiple idle servers (numInFlight=0 after queries complete) should be 
ranked by latency.
+    // In this config (autodecay disabled, alpha=1) inFlightEMA freezes at 1 
after the first
+    // submission, so with floor=1: score = (1+0+1)^3 * latency = 8 * latency.
+    // The server with lower observed latency gets a lower score and is 
therefore preferred.
+    // When autodecay is enabled, inFlightEMA eventually decays to 0 during 
idle periods; at that
+    // point floor=0 collapses all scores to (0+0+0)^3 * latency = 0, 
destroying latency ordering,
+    // while floor=1 keeps score = 1^3 * latency = latency and preserves the 
ranking.
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR,
 1);
+    ServerRoutingStatsManager managerMultiServer =
+        new ServerRoutingStatsManager(new PinotConfiguration(properties), 
_brokerMetrics);
+    managerMultiServer.init();
+    requestId = 0;
+
+    managerMultiServer.recordStatsForQuerySubmission(requestId++, 
"fastServer");
+    waitForStatsUpdate(managerMultiServer, requestId);
+    managerMultiServer.recordStatsUponResponseArrival(requestId++, 
"fastServer", 5);
+    waitForStatsUpdate(managerMultiServer, requestId);
+
+    managerMultiServer.recordStatsForQuerySubmission(requestId++, 
"slowServer");
+    waitForStatsUpdate(managerMultiServer, requestId);
+    managerMultiServer.recordStatsUponResponseArrival(requestId++, 
"slowServer", 20);
+    waitForStatsUpdate(managerMultiServer, requestId);
+
+    // Both servers are now idle (numInFlight=0). The fast server should rank 
better (lower score).
+    double fastScore = 
managerMultiServer.fetchHybridScoreForServer("fastServer");
+    double slowScore = 
managerMultiServer.fetchHybridScoreForServer("slowServer");
+    assertTrue(fastScore < slowScore, "Idle servers should be ranked by 
latency");
+  }
+
   private void assertStatsNullForInstance(ServerRoutingStatsManager manager, 
String instanceId) {
     Integer numInFlightReq = 
manager.fetchNumInFlightRequestsForServer(instanceId);
     assertNull(numInFlightReq);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fbe1fe743f3..768d46d2144 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1134,6 +1134,9 @@ public class CommonConstants {
       // Parameters related to Hybrid score.
       public static final String CONFIG_OF_HYBRID_SCORE_EXPONENT = 
CONFIG_PREFIX + ".hybrid.score.exponent";
       public static final int DEFAULT_HYBRID_SCORE_EXPONENT = 3;
+      public static final String CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR =
+          CONFIG_PREFIX + ".hybrid.score.queue.size.floor";
+      public static final int DEFAULT_HYBRID_SCORE_QUEUE_FLOOR = 0;
 
       // Threadpool size of ServerRoutingStatsManager. This controls the 
number of threads available to update routing
       // stats for servers upon query submission and response arrival.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to