gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3241989513
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java:
##########
@@ -130,6 +132,38 @@ public void submit(Worker.QueryRequest request,
QueryServerInstance virtualServe
_dispatchStub.withDeadline(deadline).submit(request, new
LastValueDispatchObserver<>(virtualServer, callback));
}
+ /**
+ * Opens a {@code SubmitWithStream} bidi RPC for one server, sends the
initial {@code submit}, and registers the
+ * resulting {@link StreamingDispatchObserver} with {@code session} for
cancel fan-out and {@code OpChainComplete}
+ * accumulation.
+ *
+ * <p>The submit-ack callback is invoked exactly once: with the {@link
Worker.QueryResponse} on the first
+ * {@code submit_ack} from the server, or with a non-null {@link Throwable}
if the stream errors before the ack
+ * arrives.
+ *
+ * @param request the plan submission
+ * @param virtualServer server identity (used in callbacks for
routing decisions on failure)
+ * @param deadline gRPC deadline for the call
+ * @param session broker-side streaming session — the returned
observer registers itself here
+ * @param expectedOpChainsForThisServer number of opchains this server is
expected to report; used to drain the
+ * session latch correctly when the stream
errors before all opchains have responded
+ * @param ackCallback receives the submit-ack response or a
failure throwable
+ * @return the observer, also exposed as
+ * {@link
org.apache.pinot.query.service.dispatch.streaming.StreamingServerHandle} on the
session for
+ * cancel fan-out
+ */
+ public StreamingDispatchObserver submitWithStream(Worker.QueryRequest
request, QueryServerInstance virtualServer,
+ Deadline deadline, StreamingQuerySession session, int
expectedOpChainsForThisServer,
+ java.util.function.BiConsumer<Worker.QueryResponse, Throwable>
ackCallback) {
+ StreamingDispatchObserver observer = new
StreamingDispatchObserver(virtualServer, session,
+ expectedOpChainsForThisServer, ackCallback);
+ StreamObserver<Worker.BrokerToServer> outbound =
_dispatchStub.withDeadline(deadline).submitWithStream(observer);
Review Comment:
This is incorrect. First, `MAX_CONCURRENT_STREAMS` is Integer.MAX_VALUE in
Java (it seems it is 100 in Go). Anyway, the idea that new calls queue up in
the stream make sense, but that should not be a problem.
Specifically, here we are not adding much noise. The new streams are not
busy, they just send a small message when the opchains finish, which is data we
don't send through the mailbox. In fact, the number of bytes being sent here is
smaller, as the mailbox mechanism needs to send the stats of the deeper stages
multiple times (as many as stages between them and the root stage).
That is why the suggested metric doesn't make much sense, as even the number
of calls per connection is high, these should be silent calls.
Instead, I created two other metrics:
- MSE_STREAM_STATS_INCOMPLETE_COVERAGE: number of queries that returned
partial stats
- MSE_STREAM_STATS_QUERIES: total queries using the new mechanism
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]