gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3241989513


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java:
##########
@@ -130,6 +132,38 @@ public void submit(Worker.QueryRequest request, 
QueryServerInstance virtualServe
     _dispatchStub.withDeadline(deadline).submit(request, new 
LastValueDispatchObserver<>(virtualServer, callback));
   }
 
+  /**
+   * Opens a {@code SubmitWithStream} bidi RPC for one server, sends the 
initial {@code submit}, and registers the
+   * resulting {@link StreamingDispatchObserver} with {@code session} for 
cancel fan-out and {@code OpChainComplete}
+   * accumulation.
+   *
+   * <p>The submit-ack callback is invoked exactly once: with the {@link 
Worker.QueryResponse} on the first
+   * {@code submit_ack} from the server, or with a non-null {@link Throwable} 
if the stream errors before the ack
+   * arrives.
+   *
+   * @param request               the plan submission
+   * @param virtualServer         server identity (used in callbacks for 
routing decisions on failure)
+   * @param deadline              gRPC deadline for the call
+   * @param session               broker-side streaming session — the returned 
observer registers itself here
+   * @param expectedOpChainsForThisServer  number of opchains this server is 
expected to report; used to drain the
+   *                              session latch correctly when the stream 
errors before all opchains have responded
+   * @param ackCallback           receives the submit-ack response or a 
failure throwable
+   * @return the observer, also exposed as
+   *         {@link 
org.apache.pinot.query.service.dispatch.streaming.StreamingServerHandle} on the 
session for
+   *         cancel fan-out
+   */
+  public StreamingDispatchObserver submitWithStream(Worker.QueryRequest 
request, QueryServerInstance virtualServer,
+      Deadline deadline, StreamingQuerySession session, int 
expectedOpChainsForThisServer,
+      java.util.function.BiConsumer<Worker.QueryResponse, Throwable> 
ackCallback) {
+    StreamingDispatchObserver observer = new 
StreamingDispatchObserver(virtualServer, session,
+        expectedOpChainsForThisServer, ackCallback);
+    StreamObserver<Worker.BrokerToServer> outbound = 
_dispatchStub.withDeadline(deadline).submitWithStream(observer);

Review Comment:
   This is incorrect. First, `MAX_CONCURRENT_STREAMS` is Integer.MAX_VALUE in 
Java (it seems it is 100 in Go). Anyway, the idea that new calls queue up in 
the stream make sense, but that should not be a problem.
   
   Specifically, here we are not adding much noise. The new streams are not 
busy, they just send a small message when the opchains finish, which is data we 
don't send through the mailbox. In fact, the number of bytes being sent here is 
smaller, as the mailbox mechanism needs to send the stats of the deeper stages 
multiple times (as many as stages between them and the root stage).
   
   That is why the suggested metric doesn't make much sense, as even the number 
of calls per connection is high, these should be silent calls.
   
   Instead, I created two other metrics:
   - MSE_STREAM_STATS_INCOMPLETE_COVERAGE: number of queries that returned 
partial stats
   - MSE_STREAM_STATS_QUERIES: total queries using the new mechanism



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to