gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3274655233
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -450,6 +454,255 @@ private void
submitTimeSeriesInternal(Worker.TimeSeriesQueryRequest request,
}
}
+ /// Stream-mode submission handler. The broker keeps the stream open for the
query lifetime; the server replies
+ /// with a {@code submit_ack} as the first message and (in subsequent
commits) per-opchain
+ /// {@link Worker.OpChainComplete} messages followed by a final {@link
Worker.ServerDone}.
+ ///
+ /// This skeleton wires up the gRPC mechanics + plan submission via the
existing submission path. It does NOT yet
+ /// emit OpChainComplete / ServerDone — those need a per-opchain completion
hook on
+ /// {@link org.apache.pinot.query.runtime.executor.OpChainSchedulerService},
which is layered on next.
+ /// Cancel still routes through the existing unary {@link
#cancel(Worker.CancelRequest, StreamObserver)} RPC; broker
+ /// stream-close also triggers a cancel here.
+ @Override
+ public StreamObserver<Worker.BrokerToServer> submitWithStream(
+ StreamObserver<Worker.ServerToBroker> responseObserver) {
+ return new SubmitWithStreamObserver(responseObserver);
+ }
+
+ /// Per-query state for an open {@code SubmitWithStream} call. Owns the
response stream and serialises every
+ /// {@code onNext} call on it via a {@code synchronized} block — gRPC
requires {@code StreamObserver.onNext} to be
+ /// called serially.
+ ///
+ /// Tracks the expected number of opchains for the request (sum of
WorkerMetadata across all stages). An
+ /// {@link OpChainCompletionListener} registered with {@link
QueryRunner#registerOpChainCompletionListener}
+ /// fires once per opchain finishing, encodes its stats via {@link
MultiStageStatsTreeEncoder}, and emits an
+ /// {@link Worker.OpChainComplete} on the response stream. When the
per-request completed-count reaches the
+ /// expected total, {@link Worker.ServerDone} is emitted and the stream is
closed.
+ ///
+ /// All blocking work (plan deserialization, opchain construction) runs on
+ /// {@link QueryServer#_submissionExecutorService}.
+ private final class SubmitWithStreamObserver implements
StreamObserver<Worker.BrokerToServer> {
+ private final StreamObserver<Worker.ServerToBroker> _responseObserver;
+ /// Serialises onNext calls on the response stream and guards mutable
session state.
+ private final Object _streamLock = new Object();
+ /// True once we've received the first {@code submit} and dispatched it.
+ private final AtomicBoolean _submitted = new AtomicBoolean(false);
+ /// True once we've completed the response stream (success or error).
Idempotent guard.
+ private final AtomicBoolean _completed = new AtomicBoolean(false);
+ /// Number of opchains we expect to report for this request — set after we
deserialize the plan.
+ private final AtomicInteger _expectedOpChains = new AtomicInteger(-1);
+ /// Number of opchains that have reported so far via the completion
listener.
+ private final AtomicInteger _completedOpChains = new AtomicInteger(0);
+ /// Set once we successfully parse the request id from the submit
metadata. Used by cancel-via-stream.
+ private volatile long _requestId = -1;
+
+ SubmitWithStreamObserver(StreamObserver<Worker.ServerToBroker>
responseObserver) {
+ _responseObserver = responseObserver;
+ }
+
+ @Override
+ public void onNext(Worker.BrokerToServer message) {
+ switch (message.getPayloadCase()) {
+ case SUBMIT:
+ handleSubmit(message.getSubmit());
+ break;
+ case CANCEL:
+ handleCancel(message.getCancel());
+ break;
+ case PAYLOAD_NOT_SET:
+ default:
+ sendErrorAndComplete("Unexpected BrokerToServer payload: " +
message.getPayloadCase());
+ break;
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // Broker-side stream error / disconnect. Treat like a cancel and clean
up; do not reply on the response stream
+ // (the underlying transport is gone).
+ LOGGER.warn("SubmitWithStream stream error for request {}: {}",
_requestId, t.getMessage());
+ _completed.set(true);
+ cleanupListener();
+ cancelIfSubmitted();
+ }
+
+ @Override
+ public void onCompleted() {
+ // Broker has half-closed (no more inbound messages). The server stream
stays open until all opchains have
+ // reported via the completion listener — it's the listener's job to
emit ServerDone and complete the stream.
+ // If the broker half-closes before the server is done, that's OK; we
keep emitting on the response stream
+ // until our own completion criterion is met.
+ }
+
+ private void handleSubmit(Worker.QueryRequest request) {
+ if (!_submitted.compareAndSet(false, true)) {
+ sendErrorAndComplete("Multiple submit messages on the same stream are
not allowed");
+ return;
+ }
+ ServerMetrics.get().addMeteredGlobalValue(ServerMeter.MSE_QUERIES, 1L);
+ Map<String, String> deserializedMetadata;
+ try {
+ deserializedMetadata =
QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deserializing request metadata",
e);
+ sendErrorAndComplete("Caught exception while deserializing request
metadata: " + e.getMessage());
+ return;
+ }
+ // Override the cluster-level _sendStats decision for this request:
stats travel out-of-band on the bidi stream
+ // (via the OpChainCompletionListener), so we suppress the mailbox-side
stats path. The override is read by
+ // QueryRunner.effectiveSendStats(...).
+ Map<String, String> reqMetadata = new HashMap<>(deserializedMetadata);
+
reqMetadata.put(CommonConstants.MultiStageQueryRunner.KEY_OF_STATS_REPORTING_MODE,
+ CommonConstants.MultiStageQueryRunner.STATS_REPORTING_MODE_STREAM);
+ try {
+ _requestId = Long.parseLong(reqMetadata.get(MetadataKeys.REQUEST_ID));
+ } catch (Exception ignored) {
+ // _requestId stays at -1; cancel-on-stream-close will just be a no-op.
+ }
+ // Count how many opchains will run on this server: sum of
WorkerMetadata across all stage plans.
+ int opChainCount = 0;
+ for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+ opChainCount += stagePlan.getStageMetadata().getWorkerMetadataCount();
+ }
+ final int expected = opChainCount;
+ _expectedOpChains.set(expected);
+
+ // Register the per-request completion listener BEFORE submitting.
Otherwise short opchains could finish before
+ // we've registered and we'd miss their events.
+ if (_requestId >= 0 && expected > 0) {
+ _queryRunner.registerOpChainCompletionListener(_requestId,
this::onOpChainComplete);
+ }
+
+ long timeoutMs =
Long.parseLong(reqMetadata.get(QueryOptionKey.TIMEOUT_MS));
+ CompletableFuture.runAsync(() -> submitInternal(request, reqMetadata),
_submissionExecutorService)
+ .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ LOGGER.error("Caught exception while submitting request: {}",
_requestId, error);
+ sendSubmitAck(buildErrorResponse("Caught exception while
submitting request: " + error.getMessage()));
+ // Submission failed — no opchains will run, so emit ServerDone
immediately and clean up.
+ cleanupListener();
+ sendDoneAndComplete();
+ } else {
+ sendSubmitAck(buildOkResponse());
+ // If for some reason expected was 0 (empty plan), close the
stream now.
+ if (expected == 0) {
+ cleanupListener();
+ sendDoneAndComplete();
+ }
+ }
+ });
+ }
+
+ /**
+ * Fires once per opchain on this server completing. Encodes the stats
into a {@link Worker.MultiStageStatsTree},
+ * emits an {@link Worker.OpChainComplete}, and emits {@link
Worker.ServerDone} once all expected opchains have
+ * reported.
+ */
+ private void onOpChainComplete(OpChainId opChainId, MultiStageOperator
rootOperator,
+ @Nullable MultiStageQueryStats stats, OpChainExecutionContext context,
@Nullable Throwable error) {
+ Worker.OpChainComplete.Builder builder =
Worker.OpChainComplete.newBuilder()
+ .setStageId(opChainId.getStageId())
+ .setWorkerId(opChainId.getVirtualServerId())
+ .setSuccess(error == null);
+ if (error != null) {
+ builder.setErrorMsg(error.getMessage() == null ?
error.getClass().getSimpleName() : error.getMessage());
+ }
+ if (stats != null) {
+ try {
+ builder.setStats(MultiStageStatsTreeEncoder.encode(rootOperator,
stats, context));
Review Comment:
**Concern 1 (error-path coverage):**
Acknowledged. The encoder has a strict all-or-nothing contract (now
documented in Javadoc on `encode()`):
1. The upfront `treeSize != flatSize` check throws `IllegalStateException`
*before* any proto node is built — there is literally nothing to return.
2. An `IOException` from `serializeStatMap` mid-walk leaves partial builders
only on the Java call stack; they are discarded when the exception unwinds.
Neither path can produce a partial result that we could send. We're
accepting this as a known limitation for this first version — the broker treats
the missing stats tree as an unmerged opchain and logs a warning, which
preserves query correctness. A follow-up could redesign the encoder to
accumulate partial results as it walks rather than building everything in one
shot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]