gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3273981613
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -189,10 +191,189 @@ public QueryResult submitAndReduce(RequestContext
context, DispatchableSubPlan d
}
}
- /// Tries to recover from an exception thrown during query dispatching.
+ /// Streaming variant of {@link #submitAndReduce}: opens one {@code
SubmitWithStream} bidi RPC per server, runs the
+ /// broker's stage 0 reducer, and once the receiving mailbox finishes awaits
the per-stage stats with early
+ /// completion (returns as soon as every expected opchain has reported, or
when the wait window fires — whichever
+ /// happens first). Stats from the session accumulator are then merged into
the broker's local stage 0 stats to
+ /// build the final {@link QueryResult}.
+ ///
+ /// The wait window is bounded by the query's remaining timeout: if {@code
submitWithStream + runReducer} consumed
+ /// most of the budget, the per-stage stats may end up partial (visible via
the per-stage {@code mergeFailed} /
+ /// {@code missing} counts the session exposes).
+ ///
+ /// Cancel is handled via {@link StreamingQuerySession#fanOutCancel()} — no
unary Cancel RPCs are issued for this
+ /// query path. On any error, fan-out cancel is broadcast over the open
streams, then the broker waits for remaining
+ /// stats before building the final result.
+ ///
+ /// <b>Mixed-version policy.</b> No automatic fallback to the unary {@link
#submit} path. Enabling
+ /// {@link CommonConstants.Broker.Request.QueryOptionKey#STREAM_STATS}
requires every server in the
+ /// cluster to implement {@code SubmitWithStream}; if any server returns
{@code UNIMPLEMENTED} or any other
+ /// transport error during dispatch, {@link #submitWithStream} surfaces the
throwable through the ack queue,
+ /// {@link #processResults} throws, and this method fans out cancel via the
session before propagating the failure.
+ private QueryResult submitAndReduceWithStream(RequestContext context,
DispatchableSubPlan dispatchableSubPlan,
+ long timeoutMs, Map<String, String> queryOptions)
+ throws Exception {
+ long requestId = context.getRequestId();
+ long deadlineMs = System.currentTimeMillis() + timeoutMs;
+ Set<QueryServerInstance> servers = new HashSet<>();
+
+ // The session's expected-opchain count must equal the total number of
opchains across every (server, non-root
+ // stage) pair — that's how many OpChainComplete messages we expect to
receive.
+ Set<DispatchablePlanFragment> stagePlansWithoutRoot =
dispatchableSubPlan.getQueryStagesWithoutRoot();
+ int totalExpected = 0;
+ Map<Integer, Integer> expectedByStage = new HashMap<>();
+ for (DispatchablePlanFragment stagePlan : stagePlansWithoutRoot) {
+ int stageId = stagePlan.getPlanFragment().getFragmentId();
+ int stageCount = 0;
+ for (List<Integer> workerIds :
stagePlan.getServerInstanceToWorkerIdMap().values()) {
+ stageCount += workerIds.size();
+ }
+ totalExpected += stageCount;
+ expectedByStage.put(stageId, stageCount);
+ }
+ StreamingQuerySession session = new StreamingQuerySession(requestId,
totalExpected);
+
+ try {
+ submitWithStream(requestId, dispatchableSubPlan, timeoutMs, servers,
queryOptions, session);
+ QueryResult brokerResult = runReducer(dispatchableSubPlan, queryOptions,
_mailboxService);
+
+ // Receiving mailbox finished. Wait for stats: returns true as soon as
every opchain has reported, or false
+ // when the timeout fires.
+ long remainingMs = Math.max(0, deadlineMs - System.currentTimeMillis());
+ boolean fullCoverage = session.awaitCompletion(remainingMs,
TimeUnit.MILLISECONDS);
Review Comment:
The code now has STATS_DRAIN_ON_SUCCESS_MS = 50L and uses
Math.min(STATS_DRAIN_ON_SUCCESS_MS, remainingMs)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]