yashmayya commented on code in PR #18553:
URL: https://github.com/apache/pinot/pull/18553#discussion_r3290158601


##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -305,10 +305,12 @@ protected MultiStageBrokerRequestHandler 
createMultiStageBrokerRequestHandler(
       QueryQuotaManager queryQuotaManager, TableCache tableCache,
       MultiStageQueryThrottler multiStageQueryThrottler, FailureDetector 
failureDetector,
       ThreadAccountant threadAccountant, MultiClusterRoutingContext 
multiClusterRoutingContext,
-      WorkerManager workerManager, WorkerManager multiClusterWorkerManager) {
+      WorkerManager workerManager, WorkerManager multiClusterWorkerManager,
+      ServerRoutingStatsManager serverRoutingStatsManager) {

Review Comment:
   **MAJOR — backwards-incompatible signature change to a `protected` factory 
method.**
   
   `createMultiStageBrokerRequestHandler` is `protected`, which makes it a 
de-facto extension point for downstream broker starters (e.g., StarTree's 
commercial fork, custom plugin broker subclasses that wire instrumentation). 
Adding `ServerRoutingStatsManager` as a required parameter silently breaks any 
subclass that overrode the prior 13-arg signature: after upgrade, the 
subclass's override is no longer invoked at the new call site (line 531).
   
   The constructor on `MultiStageBrokerRequestHandler` itself preserved the old 
13-arg form (line 154 — delegating with `null`) for exactly this kind of 
backward compatibility, but this factory method did not.
   
   Suggest: keep the 13-arg factory method (default it to delegate to the new 
14-arg one with `_serverRoutingStatsManager`), update the call site at line 531 
to use the 14-arg form, and let subclassers migrate at their own pace.



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java:
##########
@@ -149,6 +153,18 @@ public long getCompletedTaskCount() {
     return tpe.getCompletedTaskCount();
   }
 
+  private ConcurrentHashMap<String, ServerRoutingStatsEntry> 
getStatsMap(QueryType queryType) {
+    return queryType == QueryType.MSE ? _mseServerQueryStatsMap : 
_serverQueryStatsMap;

Review Comment:
   **MAJOR — two silent fall-throughs in the new dispatch.**
   
   1. `QueryExecutionContext.QueryType` has three values: `SSE`, `MSE`, `TSE`. 
This ternary lumps `TSE` into the SSE bucket. Today `TimeSeriesRequestHandler` 
doesn't call `recordStats*`, so this is dormant — but the moment anyone wires 
TSE adaptive routing, TSE traffic will silently poison SSE stats, which is the 
exact engine cross-talk the PR exists to prevent.
   2. The no-arg `getStatsMap()` below defaults to the SSE map when 
`QueryThreadContext.getIfAvailable()` is `null`. Any caller invoked from a 
non-query thread (debug REST handlers, periodic tasks, future SPI consumers) 
silently reads/writes the SSE map with no warning.
   
   Suggest: switch exhaustively on the enum and either throw or log on an 
unknown/unset query type, or add a third TSE-map slot up front. At minimum, 
document on the method that the caller is expected to run with a 
`QueryThreadContext` set.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -183,6 +214,11 @@ public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan d
       cancel(requestId);
       throw e;
     } finally {
+      if (statsManager != null) {
+        for (QueryServerInstance server : incrementedServers) {
+          statsManager.recordStatsUponResponseArrival(requestId, 
server.getInstanceId(), -1);

Review Comment:
   **MAJOR — latency EMA for MSE entries will be stuck at the init value 
forever, and the new MSE gauges will export that stuck value.**
   
   Passing `latency = -1` is handled safely by `updateStatsUponResponseArrival` 
(it skips `updateLatency`), but the consequences go beyond "no latency yet":
   
   1. `_latencyMsEMA._lastUpdatedTimeMs` stays at `0` for every MSE entry, so 
the EMA's auto-decay scheduled task never fires.
   2. `_latencyMsEMA.getAverage()` returns `_avgInitializationVal` (default 
`1.0`) for the lifetime of the broker.
   3. `ADAPTIVE_SERVER_MSE_LATENCY_EMA` therefore exports a flat constant — 
operators wiring dashboards will assume the gauge is broken and stop trusting 
it.
   4. `ADAPTIVE_SERVER_MSE_HYBRID_SCORE` is `inflight^exp * latencyEma`, which 
collapses to `inflight^exp * initVal`. If `_avgInitializationVal == 0.0`, the 
MSE hybrid score is identically `0` for all servers.
   
   This isn't just a v0 inert-data concern: `BalancedInstanceSelector` and 
`ReplicaGroupInstanceSelector` (the common MSE leaf selectors via 
`BaseInstanceSelector._priorityPoolInstanceSelector`) DO invoke 
`_adaptiveServerSelector` — and via the new `getStatsMap()`, those reads now 
hit the MSE map, where these values are stuck. With `HYBRID` or `LATENCY` 
selector configured, MSE routing decisions will be derived from stuck data.
   
   Until the TODO above lands, suggest either: gate 
`ADAPTIVE_SERVER_MSE_LATENCY_EMA` / `ADAPTIVE_SERVER_MSE_HYBRID_SCORE` from 
being exported (so dashboards don't show misleading flat lines), or fall back 
to the SSE entry's latency EMA when the MSE entry has never received a real 
latency sample, or document loudly in the metric Javadoc that these gauges are 
inert until follow-up.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -167,10 +168,40 @@ public void start() {
   public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
       Map<String, String> queryOptions)
       throws Exception {
+    return submitAndReduce(context, dispatchableSubPlan, timeoutMs, 
queryOptions, null);
+  }
+
+  /// Same as {@link #submitAndReduce(RequestContext, DispatchableSubPlan, 
long, Map)} but records per-server
+  /// in-flight request statistics into {@code statsManager} for use by the 
adaptive query router.
+  /// When {@code statsManager} is non-null:
+  /// <ul>
+  ///   <li>Each leaf server is registered as having one more in-flight 
request via
+  ///       {@link ServerRoutingStatsManager#recordStatsForQuerySubmission} 
after the fan-out begins.</li>
+  ///   <li>After the full fan-out completes (or fails), each server is 
decremented via
+  ///       {@link ServerRoutingStatsManager#recordStatsUponResponseArrival} 
with {@code latency = -1}
+  ///       (no latency is recorded at this stage).</li>
+  /// </ul>
+  /// TODO: Replace the coarse end-of-fanout decrement with per-sender arrival 
once per-sender EOS
+  ///       interception is in place, and record real leaf-stage latency at 
that point.
+  public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
+      Map<String, String> queryOptions, @Nullable ServerRoutingStatsManager 
statsManager)
+      throws Exception {
     long requestId = context.getRequestId();
     Set<QueryServerInstance> servers = new HashSet<>();
+    // Tracks servers where recordStatsForQuerySubmission was actually called, 
so the finally block only
+    // decrements servers that were incremented — guarding against a partial 
failure in submit().
+    Set<QueryServerInstance> incrementedServers = new HashSet<>();
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
+      // The SSE engine increments before `submit`, but here we increment 
after because `submit` populates
+      // the list of servers. Getting the list of servers before calling 
`submit` would expose
+      // implementation details of `submit`.
+      if (statsManager != null) {
+        for (QueryServerInstance server : servers) {
+          statsManager.recordStatsForQuerySubmission(requestId, 
server.getInstanceId());

Review Comment:
   **MAJOR — increment/decrement can be reordered on the stats-manager 
executor, causing a transient negative inflight count and a wrong EMA sample.**
   
   `recordStatsForQuerySubmission` (here) and `recordStatsUponResponseArrival` 
(line 219 in the `finally`) both enqueue tasks on 
`ServerRoutingStatsManager._executorService`, which is a 
`newFixedThreadPool(2)` by default. The FIFO submission queue does NOT 
guarantee execution order across the two worker threads — for any single server 
X, the decrement task can acquire the per-server write lock before the 
increment task. When that happens:
   
   - `_numInFlightRequests` momentarily becomes `-1`, then `0` after the 
increment lands.
   - The transient `-1` is read by any concurrent 
`fetchNumInFlightRequestsForServer` because `_numInFlightRequests` is 
`volatile`.
   - The EMA is sampled only inside 
`updateNumInFlightRequestsForQuerySubmission` 
(`ServerRoutingStatsEntry.java:99`), so it gets sampled at `0` instead of the 
expected `+1`.
   
   This race is preexisting for SSE (`AsyncQueryResponse` uses the same 
executor), but the increment→decrement window in MSE is just `runReducer` 
duration (often milliseconds) rather than a server-side roundtrip — 
meaningfully more likely to reorder.
   
   Options: serialize per-server (per-server work channel keyed by 
`serverInstanceId`), single-thread the executor, or apply the decrement 
synchronously while submitting the EMA update asynchronously.



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java:
##########
@@ -52,6 +54,7 @@ public class ServerRoutingStatsManager {
   private final BrokerMetrics _brokerMetrics;
   private volatile boolean _isEnabled;
   private ConcurrentHashMap<String, ServerRoutingStatsEntry> 
_serverQueryStatsMap;
+  private ConcurrentHashMap<String, ServerRoutingStatsEntry> 
_mseServerQueryStatsMap;

Review Comment:
   **MAJOR — debug surface silently omits the new MSE map.**
   
   The new `_mseServerQueryStatsMap` is not exposed by any of the public read 
APIs of this class:
   - `getServerRoutingStats()` (line 236) still returns only 
`_serverQueryStatsMap`, so the `/debug/serverRoutingStats` REST endpoint 
(`PinotBrokerDebug.java:280`) returns SSE-only data after this PR.
   - `getServerRoutingStatsStr()` (line 243) still iterates only 
`_serverQueryStatsMap`.
   
   After this PR, the only programmatic view into MSE adaptive-routing state is 
the Prometheus gauges. Operators triaging routing decisions via the debug REST 
endpoint will be silently blind for MSE.
   
   Suggest either growing the response payload to `Map<QueryType, Map<String, 
ServerRoutingStatsEntry>>` or adding a parallel debug endpoint.



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java:
##########
@@ -221,4 +224,34 @@ public void 
testQueryDispatcherThrowsWhenDeadlinePreExpiredAndAsyncResponseNotPo
       _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), 
dispatchableSubPlan, 0L, new HashSet<>(), Map.of());
     }
   }
+
+  @Test
+  public void testStatsManagerRecordsSubmissionAndArrivalForDispatchedServers()
+      throws Exception {
+    ServerRoutingStatsManager statsManager = 
Mockito.mock(ServerRoutingStatsManager.class);
+    String sql = "SELECT * FROM a";
+    long requestId = REQUEST_ID_GEN.getAndIncrement();
+    RequestContext context = new DefaultRequestContext();
+    context.setRequestId(requestId);
+    DispatchableSubPlan plan = _queryEnvironment.planQuery(sql);
+
+    Set<String> expectedInstanceIds = new HashSet<>();
+    for (DispatchablePlanFragment fragment : plan.getQueryStagesWithoutRoot()) 
{
+      for (QueryServerInstance server : 
fragment.getServerInstanceToWorkerIdMap().keySet()) {
+        expectedInstanceIds.add(server.getInstanceId());
+      }
+    }
+    Assert.assertFalse(expectedInstanceIds.isEmpty());
+
+    try (QueryThreadContext ignore = QueryThreadContext.openForMseTest()) {
+      _queryDispatcher.submitAndReduce(context, plan, 10_000L, Map.of(), 
statsManager);
+    } catch (NullPointerException e) {
+      // expected: reduce phase fails with mocked MailboxService
+    }
+
+    for (String instanceId : expectedInstanceIds) {
+      Mockito.verify(statsManager).recordStatsForQuerySubmission(requestId, 
instanceId);
+      Mockito.verify(statsManager).recordStatsUponResponseArrival(requestId, 
instanceId, -1L);

Review Comment:
   **MAJOR — this test does not actually verify the contract its production 
code claims to defend.**
   
   `Mockito.verify(statsManager).recordStatsForQuerySubmission(...)` only 
checks the method was called at least once per server. It does not catch any of 
the bugs this PR's production code comments are explicitly guarding against:
   
   1. **Ordering**: a future refactor that moves the decrement out of the 
`finally` (or breaks the `incrementedServers` set bookkeeping) still passes 
this test.
   2. **Partial-failure path**: the inline comment in `submitAndReduce` says 
"guarding against a partial failure in submit()". But the only error path 
exercised here is `runReducer` throwing NPE due to the mocked `MailboxService` 
— i.e., the path where `submit()` succeeded. The actual 
partial-`submit()`-failure case isn't covered.
   3. **Legacy `submitAndReduce(... null)` overload**: no test asserts the 
null-`statsManager` overload doesn't throw or change behavior.
   4. **Real-executor behavior**: the manager is a Mockito mock, so the async 
executor and per-server lock ordering aren't exercised at all (see related 
comment about the executor race on `QueryDispatcher.java:201`).
   
   Suggested additions:
   - A test where `submit()` throws after partial dispatch and asserts 
`incrementedServers` is empty / `recordStatsUponResponseArrival` is NOT called 
for any server.
   - A test exercising the legacy 4-arg `submitAndReduce` overload.
   - An end-to-end test using a real `ServerRoutingStatsManager` (not a mock) 
that asserts `numInFlight == 0` for every server after `submitAndReduce` 
returns successfully — which is what the inline comments actually claim.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to