timothy-e commented on code in PR #18553:
URL: https://github.com/apache/pinot/pull/18553#discussion_r3319915693


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -167,10 +168,40 @@ public void start() {
   public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
       Map<String, String> queryOptions)
       throws Exception {
+    return submitAndReduce(context, dispatchableSubPlan, timeoutMs, 
queryOptions, null);
+  }
+
+  /// Same as {@link #submitAndReduce(RequestContext, DispatchableSubPlan, 
long, Map)} but records per-server
+  /// in-flight request statistics into {@code statsManager} for use by the 
adaptive query router.
+  /// When {@code statsManager} is non-null:
+  /// <ul>
+  ///   <li>Each leaf server is registered as having one more in-flight 
request via
+  ///       {@link ServerRoutingStatsManager#recordStatsForQuerySubmission} 
after the fan-out begins.</li>
+  ///   <li>After the full fan-out completes (or fails), each server is 
decremented via
+  ///       {@link ServerRoutingStatsManager#recordStatsUponResponseArrival} 
with {@code latency = -1}
+  ///       (no latency is recorded at this stage).</li>
+  /// </ul>
+  /// TODO: Replace the coarse end-of-fanout decrement with per-sender arrival 
once per-sender EOS
+  ///       interception is in place, and record real leaf-stage latency at 
that point.
+  public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
+      Map<String, String> queryOptions, @Nullable ServerRoutingStatsManager 
statsManager)
+      throws Exception {
     long requestId = context.getRequestId();
     Set<QueryServerInstance> servers = new HashSet<>();
+    // Tracks servers where recordStatsForQuerySubmission was actually called, 
so the finally block only
+    // decrements servers that were incremented — guarding against a partial 
failure in submit().
+    Set<QueryServerInstance> incrementedServers = new HashSet<>();
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
+      // The SSE engine increments before `submit`, but here we increment 
after because `submit` populates
+      // the list of servers. Getting the list of servers before calling 
`submit` would expose
+      // implementation details of `submit`.
+      if (statsManager != null) {
+        for (QueryServerInstance server : servers) {
+          statsManager.recordStatsForQuerySubmission(requestId, 
server.getInstanceId());

Review Comment:
   I don't think this is true. `submt` does an async dispatch. we mark the 
servers as incremented very shortly after submitting. `runReducer` has to fetch 
the query results, which will generally be longer than SSE queries. This seems 
to me to be the same severity as the SSE adaptive routing, and it doesn't seem 
to have caused any issues yet. 
   
   In practice for this one, we haven't seen any negative inflight requests on 
the ~10 Stripe clusters these stats are deployed to, over the last month. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -183,6 +214,11 @@ public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan d
       cancel(requestId);
       throw e;
     } finally {
+      if (statsManager != null) {
+        for (QueryServerInstance server : incrementedServers) {
+          statsManager.recordStatsUponResponseArrival(requestId, 
server.getInstanceId(), -1);

Review Comment:
   Since this is a WIP feature and likely no one will immediately start usng 
the MSE latency stats, are you okay with leaving this as a TODO until the next 
PR merges? I have it ready internally and the stats are working well for Stripe 
so far, I just can't stack PRs on another repo. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to