gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3274656274
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -450,6 +454,255 @@ private void
submitTimeSeriesInternal(Worker.TimeSeriesQueryRequest request,
}
}
+ /// Stream-mode submission handler. The broker keeps the stream open for the
query lifetime; the server replies
+ /// with a {@code submit_ack} as the first message and (in subsequent
commits) per-opchain
+ /// {@link Worker.OpChainComplete} messages followed by a final {@link
Worker.ServerDone}.
+ ///
+ /// This skeleton wires up the gRPC mechanics + plan submission via the
existing submission path. It does NOT yet
+ /// emit OpChainComplete / ServerDone — those need a per-opchain completion
hook on
+ /// {@link org.apache.pinot.query.runtime.executor.OpChainSchedulerService},
which is layered on next.
+ /// Cancel still routes through the existing unary {@link
#cancel(Worker.CancelRequest, StreamObserver)} RPC; broker
+ /// stream-close also triggers a cancel here.
+ @Override
+ public StreamObserver<Worker.BrokerToServer> submitWithStream(
+ StreamObserver<Worker.ServerToBroker> responseObserver) {
+ return new SubmitWithStreamObserver(responseObserver);
+ }
+
+ /// Per-query state for an open {@code SubmitWithStream} call. Owns the
response stream and serialises every
+ /// {@code onNext} call on it via a {@code synchronized} block — gRPC
requires {@code StreamObserver.onNext} to be
+ /// called serially.
+ ///
+ /// Tracks the expected number of opchains for the request (sum of
WorkerMetadata across all stages). An
+ /// {@link OpChainCompletionListener} registered with {@link
QueryRunner#registerOpChainCompletionListener}
+ /// fires once per opchain finishing, encodes its stats via {@link
MultiStageStatsTreeEncoder}, and emits an
+ /// {@link Worker.OpChainComplete} on the response stream. When the
per-request completed-count reaches the
+ /// expected total, {@link Worker.ServerDone} is emitted and the stream is
closed.
+ ///
+ /// All blocking work (plan deserialization, opchain construction) runs on
+ /// {@link QueryServer#_submissionExecutorService}.
+ private final class SubmitWithStreamObserver implements
StreamObserver<Worker.BrokerToServer> {
+ private final StreamObserver<Worker.ServerToBroker> _responseObserver;
+ /// Serialises onNext calls on the response stream and guards mutable
session state.
+ private final Object _streamLock = new Object();
+ /// True once we've received the first {@code submit} and dispatched it.
+ private final AtomicBoolean _submitted = new AtomicBoolean(false);
+ /// True once we've completed the response stream (success or error).
Idempotent guard.
+ private final AtomicBoolean _completed = new AtomicBoolean(false);
+ /// Number of opchains we expect to report for this request — set after we
deserialize the plan.
+ private final AtomicInteger _expectedOpChains = new AtomicInteger(-1);
+ /// Number of opchains that have reported so far via the completion
listener.
+ private final AtomicInteger _completedOpChains = new AtomicInteger(0);
+ /// Set once we successfully parse the request id from the submit
metadata. Used by cancel-via-stream.
+ private volatile long _requestId = -1;
+
+ SubmitWithStreamObserver(StreamObserver<Worker.ServerToBroker>
responseObserver) {
+ _responseObserver = responseObserver;
+ }
+
+ @Override
+ public void onNext(Worker.BrokerToServer message) {
+ switch (message.getPayloadCase()) {
+ case SUBMIT:
+ handleSubmit(message.getSubmit());
+ break;
+ case CANCEL:
+ handleCancel(message.getCancel());
+ break;
+ case PAYLOAD_NOT_SET:
+ default:
+ sendErrorAndComplete("Unexpected BrokerToServer payload: " +
message.getPayloadCase());
+ break;
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // Broker-side stream error / disconnect. Treat like a cancel and clean
up; do not reply on the response stream
+ // (the underlying transport is gone).
+ LOGGER.warn("SubmitWithStream stream error for request {}: {}",
_requestId, t.getMessage());
+ _completed.set(true);
+ cleanupListener();
+ cancelIfSubmitted();
+ }
+
+ @Override
+ public void onCompleted() {
+ // Broker has half-closed (no more inbound messages). The server stream
stays open until all opchains have
+ // reported via the completion listener — it's the listener's job to
emit ServerDone and complete the stream.
+ // If the broker half-closes before the server is done, that's OK; we
keep emitting on the response stream
+ // until our own completion criterion is met.
+ }
+
+ private void handleSubmit(Worker.QueryRequest request) {
+ if (!_submitted.compareAndSet(false, true)) {
+ sendErrorAndComplete("Multiple submit messages on the same stream are
not allowed");
+ return;
+ }
+ ServerMetrics.get().addMeteredGlobalValue(ServerMeter.MSE_QUERIES, 1L);
+ Map<String, String> deserializedMetadata;
+ try {
+ deserializedMetadata =
QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deserializing request metadata",
e);
+ sendErrorAndComplete("Caught exception while deserializing request
metadata: " + e.getMessage());
+ return;
+ }
+ // Override the cluster-level _sendStats decision for this request:
stats travel out-of-band on the bidi stream
+ // (via the OpChainCompletionListener), so we suppress the mailbox-side
stats path. The override is read by
+ // QueryRunner.effectiveSendStats(...).
+ Map<String, String> reqMetadata = new HashMap<>(deserializedMetadata);
+
reqMetadata.put(CommonConstants.MultiStageQueryRunner.KEY_OF_STATS_REPORTING_MODE,
+ CommonConstants.MultiStageQueryRunner.STATS_REPORTING_MODE_STREAM);
+ try {
+ _requestId = Long.parseLong(reqMetadata.get(MetadataKeys.REQUEST_ID));
+ } catch (Exception ignored) {
+ // _requestId stays at -1; cancel-on-stream-close will just be a no-op.
+ }
+ // Count how many opchains will run on this server: sum of
WorkerMetadata across all stage plans.
+ int opChainCount = 0;
+ for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+ opChainCount += stagePlan.getStageMetadata().getWorkerMetadataCount();
+ }
+ final int expected = opChainCount;
+ _expectedOpChains.set(expected);
+
+ // Register the per-request completion listener BEFORE submitting.
Otherwise short opchains could finish before
+ // we've registered and we'd miss their events.
+ if (_requestId >= 0 && expected > 0) {
+ _queryRunner.registerOpChainCompletionListener(_requestId,
this::onOpChainComplete);
+ }
+
+ long timeoutMs =
Long.parseLong(reqMetadata.get(QueryOptionKey.TIMEOUT_MS));
+ CompletableFuture.runAsync(() -> submitInternal(request, reqMetadata),
_submissionExecutorService)
+ .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ LOGGER.error("Caught exception while submitting request: {}",
_requestId, error);
+ sendSubmitAck(buildErrorResponse("Caught exception while
submitting request: " + error.getMessage()));
+ // Submission failed — no opchains will run, so emit ServerDone
immediately and clean up.
+ cleanupListener();
+ sendDoneAndComplete();
+ } else {
+ sendSubmitAck(buildOkResponse());
+ // If for some reason expected was 0 (empty plan), close the
stream now.
+ if (expected == 0) {
+ cleanupListener();
+ sendDoneAndComplete();
+ }
+ }
+ });
+ }
+
+ /**
+ * Fires once per opchain on this server completing. Encodes the stats
into a {@link Worker.MultiStageStatsTree},
+ * emits an {@link Worker.OpChainComplete}, and emits {@link
Worker.ServerDone} once all expected opchains have
+ * reported.
+ */
+ private void onOpChainComplete(OpChainId opChainId, MultiStageOperator
rootOperator,
+ @Nullable MultiStageQueryStats stats, OpChainExecutionContext context,
@Nullable Throwable error) {
+ Worker.OpChainComplete.Builder builder =
Worker.OpChainComplete.newBuilder()
+ .setStageId(opChainId.getStageId())
+ .setWorkerId(opChainId.getVirtualServerId())
+ .setSuccess(error == null);
+ if (error != null) {
+ builder.setErrorMsg(error.getMessage() == null ?
error.getClass().getSimpleName() : error.getMessage());
+ }
+ if (stats != null) {
+ try {
+ builder.setStats(MultiStageStatsTreeEncoder.encode(rootOperator,
stats, context));
Review Comment:
**Concern 2 (directExecutor + blocking):**
Worth clarifying the threading here: the encoding (line 619) runs *before*
the `synchronized (_streamLock)` block — only the
`_responseObserver.onNext(message)` call is inside the lock (line 641). The
`onOpChainComplete` callback fires on the opchain executor thread (registered
via `registerOpChainCompletionListener`), which is separate from the Netty
event loop. With `directExecutor()` the inbound gRPC callbacks (new
submit/cancel messages arriving on the bidi stream) do run on the Netty event
loop, but the outbound `onNext` write from the opchain executor just enqueues
the write and returns immediately — Netty flushes it asynchronously.
**Unrelated liveness gap found while investigating this:** if
`_responseObserver.onNext(message)` itself throws a transport exception (e.g.
the broker closed the channel early), the exception was previously propagating
past `_completedOpChains.incrementAndGet()`, leaving the counter permanently
short. If that was the last opchain, `sendDoneAndComplete()` was never called,
and the broker's drain latch waited until the 50 ms timeout. Fixed in the
latest commit by moving the increment+trigger into a `finally` block. Similarly
tightened `sendDoneAndComplete()` to always call `onCompleted()` even if the
`ServerDone` send throws.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]