gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3274656274


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -450,6 +454,255 @@ private void 
submitTimeSeriesInternal(Worker.TimeSeriesQueryRequest request,
     }
   }
 
+  /// Stream-mode submission handler. The broker keeps the stream open for the 
query lifetime; the server replies
+  /// with a {@code submit_ack} as the first message and (in subsequent 
commits) per-opchain
+  /// {@link Worker.OpChainComplete} messages followed by a final {@link 
Worker.ServerDone}.
+  ///
+  /// This skeleton wires up the gRPC mechanics + plan submission via the 
existing submission path. It does NOT yet
+  /// emit OpChainComplete / ServerDone — those need a per-opchain completion 
hook on
+  /// {@link org.apache.pinot.query.runtime.executor.OpChainSchedulerService}, 
which is layered on next.
+  /// Cancel still routes through the existing unary {@link 
#cancel(Worker.CancelRequest, StreamObserver)} RPC; broker
+  /// stream-close also triggers a cancel here.
+  @Override
+  public StreamObserver<Worker.BrokerToServer> submitWithStream(
+      StreamObserver<Worker.ServerToBroker> responseObserver) {
+    return new SubmitWithStreamObserver(responseObserver);
+  }
+
+  /// Per-query state for an open {@code SubmitWithStream} call. Owns the 
response stream and serialises every
+  /// {@code onNext} call on it via a {@code synchronized} block — gRPC 
requires {@code StreamObserver.onNext} to be
+  /// called serially.
+  ///
+  /// Tracks the expected number of opchains for the request (sum of 
WorkerMetadata across all stages). An
+  /// {@link OpChainCompletionListener} registered with {@link 
QueryRunner#registerOpChainCompletionListener}
+  /// fires once per opchain finishing, encodes its stats via {@link 
MultiStageStatsTreeEncoder}, and emits an
+  /// {@link Worker.OpChainComplete} on the response stream. When the 
per-request completed-count reaches the
+  /// expected total, {@link Worker.ServerDone} is emitted and the stream is 
closed.
+  ///
+  /// All blocking work (plan deserialization, opchain construction) runs on
+  /// {@link QueryServer#_submissionExecutorService}.
+  private final class SubmitWithStreamObserver implements 
StreamObserver<Worker.BrokerToServer> {
+    private final StreamObserver<Worker.ServerToBroker> _responseObserver;
+    /// Serialises onNext calls on the response stream and guards mutable 
session state.
+    private final Object _streamLock = new Object();
+    /// True once we've received the first {@code submit} and dispatched it.
+    private final AtomicBoolean _submitted = new AtomicBoolean(false);
+    /// True once we've completed the response stream (success or error). 
Idempotent guard.
+    private final AtomicBoolean _completed = new AtomicBoolean(false);
+    /// Number of opchains we expect to report for this request — set after we 
deserialize the plan.
+    private final AtomicInteger _expectedOpChains = new AtomicInteger(-1);
+    /// Number of opchains that have reported so far via the completion 
listener.
+    private final AtomicInteger _completedOpChains = new AtomicInteger(0);
+    /// Set once we successfully parse the request id from the submit 
metadata. Used by cancel-via-stream.
+    private volatile long _requestId = -1;
+
+    SubmitWithStreamObserver(StreamObserver<Worker.ServerToBroker> 
responseObserver) {
+      _responseObserver = responseObserver;
+    }
+
+    @Override
+    public void onNext(Worker.BrokerToServer message) {
+      switch (message.getPayloadCase()) {
+        case SUBMIT:
+          handleSubmit(message.getSubmit());
+          break;
+        case CANCEL:
+          handleCancel(message.getCancel());
+          break;
+        case PAYLOAD_NOT_SET:
+        default:
+          sendErrorAndComplete("Unexpected BrokerToServer payload: " + 
message.getPayloadCase());
+          break;
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      // Broker-side stream error / disconnect. Treat like a cancel and clean 
up; do not reply on the response stream
+      // (the underlying transport is gone).
+      LOGGER.warn("SubmitWithStream stream error for request {}: {}", 
_requestId, t.getMessage());
+      _completed.set(true);
+      cleanupListener();
+      cancelIfSubmitted();
+    }
+
+    @Override
+    public void onCompleted() {
+      // Broker has half-closed (no more inbound messages). The server stream 
stays open until all opchains have
+      // reported via the completion listener — it's the listener's job to 
emit ServerDone and complete the stream.
+      // If the broker half-closes before the server is done, that's OK; we 
keep emitting on the response stream
+      // until our own completion criterion is met.
+    }
+
+    private void handleSubmit(Worker.QueryRequest request) {
+      if (!_submitted.compareAndSet(false, true)) {
+        sendErrorAndComplete("Multiple submit messages on the same stream are 
not allowed");
+        return;
+      }
+      ServerMetrics.get().addMeteredGlobalValue(ServerMeter.MSE_QUERIES, 1L);
+      Map<String, String> deserializedMetadata;
+      try {
+        deserializedMetadata = 
QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while deserializing request metadata", 
e);
+        sendErrorAndComplete("Caught exception while deserializing request 
metadata: " + e.getMessage());
+        return;
+      }
+      // Override the cluster-level _sendStats decision for this request: 
stats travel out-of-band on the bidi stream
+      // (via the OpChainCompletionListener), so we suppress the mailbox-side 
stats path. The override is read by
+      // QueryRunner.effectiveSendStats(...).
+      Map<String, String> reqMetadata = new HashMap<>(deserializedMetadata);
+      
reqMetadata.put(CommonConstants.MultiStageQueryRunner.KEY_OF_STATS_REPORTING_MODE,
+          CommonConstants.MultiStageQueryRunner.STATS_REPORTING_MODE_STREAM);
+      try {
+        _requestId = Long.parseLong(reqMetadata.get(MetadataKeys.REQUEST_ID));
+      } catch (Exception ignored) {
+        // _requestId stays at -1; cancel-on-stream-close will just be a no-op.
+      }
+      // Count how many opchains will run on this server: sum of 
WorkerMetadata across all stage plans.
+      int opChainCount = 0;
+      for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+        opChainCount += stagePlan.getStageMetadata().getWorkerMetadataCount();
+      }
+      final int expected = opChainCount;
+      _expectedOpChains.set(expected);
+
+      // Register the per-request completion listener BEFORE submitting. 
Otherwise short opchains could finish before
+      // we've registered and we'd miss their events.
+      if (_requestId >= 0 && expected > 0) {
+        _queryRunner.registerOpChainCompletionListener(_requestId, 
this::onOpChainComplete);
+      }
+
+      long timeoutMs = 
Long.parseLong(reqMetadata.get(QueryOptionKey.TIMEOUT_MS));
+      CompletableFuture.runAsync(() -> submitInternal(request, reqMetadata), 
_submissionExecutorService)
+          .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+          .whenComplete((result, error) -> {
+            if (error != null) {
+              LOGGER.error("Caught exception while submitting request: {}", 
_requestId, error);
+              sendSubmitAck(buildErrorResponse("Caught exception while 
submitting request: " + error.getMessage()));
+              // Submission failed — no opchains will run, so emit ServerDone 
immediately and clean up.
+              cleanupListener();
+              sendDoneAndComplete();
+            } else {
+              sendSubmitAck(buildOkResponse());
+              // If for some reason expected was 0 (empty plan), close the 
stream now.
+              if (expected == 0) {
+                cleanupListener();
+                sendDoneAndComplete();
+              }
+            }
+          });
+    }
+
+    /**
+     * Fires once per opchain on this server completing. Encodes the stats 
into a {@link Worker.MultiStageStatsTree},
+     * emits an {@link Worker.OpChainComplete}, and emits {@link 
Worker.ServerDone} once all expected opchains have
+     * reported.
+     */
+    private void onOpChainComplete(OpChainId opChainId, MultiStageOperator 
rootOperator,
+        @Nullable MultiStageQueryStats stats, OpChainExecutionContext context, 
@Nullable Throwable error) {
+      Worker.OpChainComplete.Builder builder = 
Worker.OpChainComplete.newBuilder()
+          .setStageId(opChainId.getStageId())
+          .setWorkerId(opChainId.getVirtualServerId())
+          .setSuccess(error == null);
+      if (error != null) {
+        builder.setErrorMsg(error.getMessage() == null ? 
error.getClass().getSimpleName() : error.getMessage());
+      }
+      if (stats != null) {
+        try {
+          builder.setStats(MultiStageStatsTreeEncoder.encode(rootOperator, 
stats, context));

Review Comment:
   **Concern 2 (directExecutor + blocking):**
   
   Worth clarifying the threading here: the encoding (line 619) runs *before* 
the `synchronized (_streamLock)` block — only the 
`_responseObserver.onNext(message)` call is inside the lock (line 641). The 
`onOpChainComplete` callback fires on the opchain executor thread (registered 
via `registerOpChainCompletionListener`), which is separate from the Netty 
event loop. With `directExecutor()` the inbound gRPC callbacks (new 
submit/cancel messages arriving on the bidi stream) do run on the Netty event 
loop, but the outbound `onNext` write from the opchain executor just enqueues 
the write and returns immediately — Netty flushes it asynchronously.
   
   **Unrelated liveness gap found while investigating this:** if 
`_responseObserver.onNext(message)` itself throws a transport exception (e.g. 
the broker closed the channel early), the exception was previously propagating 
past `_completedOpChains.incrementAndGet()`, leaving the counter permanently 
short. If that was the last opchain, `sendDoneAndComplete()` was never called, 
and the broker's drain latch waited until the 50 ms timeout. Fixed in the 
latest commit by moving the increment+trigger into a `finally` block. Similarly 
tightened `sendDoneAndComplete()` to always call `onCompleted()` even if the 
`ServerDone` send throws.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to