yashmayya commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3238012648


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/streaming/StreamingQuerySession.java:
##########
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.streaming;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.runtime.plan.MultiStageStatsTreeDecoder;
+import org.apache.pinot.query.runtime.plan.StageStatsTreeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Broker-side per-query state for the {@code SubmitWithStream} dispatch path. 
Owns the per-stage tree accumulator,
+ * the outstanding-opchain count, the per-stage coverage counters, and the set 
of open server streams.
+ *
+ * <p>Concurrency model — all mutating methods acquire the per-session lock, 
so the accumulator and counters need no
+ * additional internal synchronization. gRPC client {@code onNext} callbacks 
land on I/O threads and call into this
+ * session directly; the work per call is short (decode + merge + decrement) 
so doing it on the I/O thread is fine.
+ *
+ * <p>Completion semantics — {@link #awaitCompletion(long, TimeUnit)} returns 
{@code true} as soon as every expected
+ * opchain has reported (early completion), and {@code false} if the timeout 
fires first. The dispatcher should call
+ * it <strong>only after</strong> the broker receiving mailbox has finished, 
so that a successful return means both
+ * "data done" and "stats fully accounted for". When it returns {@code false} 
the per-stage coverage exposes which
+ * stages are missing.
+ */
+public class StreamingQuerySession {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamingQuerySession.class);
+
+  private final long _requestId;
+  private final int _expectedOpChains;
+  private final CountDownLatch _completionLatch;
+  private final ReentrantLock _lock = new ReentrantLock();
+
+  /** Per-stage merged accumulator. Mutated under {@link #_lock}. */
+  private final Map<Integer, StageStatsTreeNode> _stageAccumulator = new 
HashMap<>();
+  /** Per-stage count of opchains that have responded successfully and merged 
cleanly. */
+  private final Map<Integer, Integer> _respondedByStage = new HashMap<>();
+  /** Per-stage count of opchains that responded but the broker couldn't merge 
their payload. */
+  private final Map<Integer, Integer> _mergeFailedByStage = new HashMap<>();
+
+  /** Set of open server streams. Iteration order is insertion order so cancel 
fan-out is deterministic. */
+  private final Set<StreamingServerHandle> _openStreams = new 
LinkedHashSet<>();
+
+  /** True after the first peer error (success=false OpChainComplete or stream 
onError). Used to trigger fan-out
+   * cancel idempotently. */
+  private boolean _peerErrorObserved = false;
+
+  public StreamingQuerySession(long requestId, int expectedOpChains) {
+    _requestId = requestId;
+    _expectedOpChains = expectedOpChains;
+    _completionLatch = new CountDownLatch(expectedOpChains);
+  }
+
+  public long getRequestId() {
+    return _requestId;
+  }
+
+  public int getExpectedOpChains() {
+    return _expectedOpChains;
+  }
+
+  /**
+   * Registers an open server stream so the session can iterate them later for 
fan-out cancel. Must be called by the
+   * dispatcher when the {@code SubmitWithStream} call is opened.
+   */
+  public void registerStream(StreamingServerHandle stream) {
+    _lock.lock();
+    try {
+      _openStreams.add(stream);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Removes a stream from the open-streams set. Called when the server emits 
{@code ServerDone} (clean close) or the
+   * stream errors. Idempotent.
+   */
+  public void unregisterStream(StreamingServerHandle stream) {
+    _lock.lock();
+    try {
+      _openStreams.remove(stream);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Records an {@link Worker.OpChainComplete} message decoded from a server 
stream. Decrements the outstanding count
+   * and merges the contained tree into the per-stage accumulator (or marks 
the stage {@code mergeFailed} on a shape
+   * mismatch / decode failure). Also records {@code success=false} reports as 
peer errors so fan-out cancel can fire.
+   */
+  public void recordOpChainComplete(Worker.OpChainComplete message) {
+    int stageId = message.getStageId();
+    boolean isSuccess = message.getSuccess();
+    Worker.MultiStageStatsTree statsTree = message.getStats();
+
+    boolean shouldFanOutCancel = false;
+    _lock.lock();
+    try {
+      if (!isSuccess) {
+        if (!_peerErrorObserved) {
+          _peerErrorObserved = true;
+          shouldFanOutCancel = true;
+        }
+      }
+      if (statsTree.hasCurrentStage()) {
+        try {
+          MultiStageStatsTreeDecoder.Decoded decoded = 
MultiStageStatsTreeDecoder.decode(statsTree);

Review Comment:
   **Decode + merge under `_lock` on the Netty event loop.** The class Javadoc 
says "doing it on the I/O thread is fine" — true at low QPS, but two real 
problems show up under load:
   
   1. **Cross-query head-of-line blocking.** Netty client event loops are 
shared across *all* open streams (across queries). A slow decode here delays 
`OpChainComplete` delivery for *other* queries on the same loop.
   2. **Lock contention amplification.** Two servers reporting 
near-simultaneously for the same query busy-spin the second I/O thread until 
the first releases.
   
   `MultiStageStatsTreeDecoder.decode` is a recursive walk that allocates per 
node and deserializes `StatMap` bytes; `mergeIntoAccumulatorLocked` then walks 
again calling `StatMap.merge` per node. Suggest moving decode+merge to a small 
worker pool and keeping only the `_completionLatch.countDown()` + accumulator 
`put` under `_lock`. At minimum, please verify under a representative high-QPS 
soak before flipping the cluster default.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java:
##########
@@ -130,6 +132,38 @@ public void submit(Worker.QueryRequest request, 
QueryServerInstance virtualServe
     _dispatchStub.withDeadline(deadline).submit(request, new 
LastValueDispatchObserver<>(virtualServer, callback));
   }
 
+  /**
+   * Opens a {@code SubmitWithStream} bidi RPC for one server, sends the 
initial {@code submit}, and registers the
+   * resulting {@link StreamingDispatchObserver} with {@code session} for 
cancel fan-out and {@code OpChainComplete}
+   * accumulation.
+   *
+   * <p>The submit-ack callback is invoked exactly once: with the {@link 
Worker.QueryResponse} on the first
+   * {@code submit_ack} from the server, or with a non-null {@link Throwable} 
if the stream errors before the ack
+   * arrives.
+   *
+   * @param request               the plan submission
+   * @param virtualServer         server identity (used in callbacks for 
routing decisions on failure)
+   * @param deadline              gRPC deadline for the call
+   * @param session               broker-side streaming session — the returned 
observer registers itself here
+   * @param expectedOpChainsForThisServer  number of opchains this server is 
expected to report; used to drain the
+   *                              session latch correctly when the stream 
errors before all opchains have responded
+   * @param ackCallback           receives the submit-ack response or a 
failure throwable
+   * @return the observer, also exposed as
+   *         {@link 
org.apache.pinot.query.service.dispatch.streaming.StreamingServerHandle} on the 
session for
+   *         cancel fan-out
+   */
+  public StreamingDispatchObserver submitWithStream(Worker.QueryRequest 
request, QueryServerInstance virtualServer,
+      Deadline deadline, StreamingQuerySession session, int 
expectedOpChainsForThisServer,
+      java.util.function.BiConsumer<Worker.QueryResponse, Throwable> 
ackCallback) {
+    StreamingDispatchObserver observer = new 
StreamingDispatchObserver(virtualServer, session,
+        expectedOpChainsForThisServer, ackCallback);
+    StreamObserver<Worker.BrokerToServer> outbound = 
_dispatchStub.withDeadline(deadline).submitWithStream(observer);

Review Comment:
   **HTTP/2 stream cap will become the throughput ceiling.** Each 
`DispatchClient` still holds a single `ManagedChannel` per server (the TODO at 
the top of the class notes this), and `maxConcurrentCallsPerConnection` isn't 
bumped on either side. Behavioral change vs unary `submit`: a stream slot used 
to be held for milliseconds, `submitWithStream` holds one for the full query 
lifetime.
   
   With Q in-flight queries × N servers fanned out, each broker→server HTTP/2 
connection needs ~Q concurrent streams. Netty/gRPC default 
`MAX_CONCURRENT_STREAMS=100` means at high QPS new calls queue at the channel 
level — manifesting as broker slowness, not stream errors. Recommend before 
flipping `_streamStats=true` by default:
   - Bump server-side `NettyServerBuilder.maxConcurrentCallsPerConnection` in 
`QueryServer.buildGrpcServer` (today's code doesn't set it).
   - Implement the channel-pool TODO above this class, or document the 
per-server stream-count ceiling.
   - Add `numActiveStreamsByServer` metric for capacity observability.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -450,6 +454,255 @@ private void 
submitTimeSeriesInternal(Worker.TimeSeriesQueryRequest request,
     }
   }
 
+  /// Stream-mode submission handler. The broker keeps the stream open for the 
query lifetime; the server replies
+  /// with a {@code submit_ack} as the first message and (in subsequent 
commits) per-opchain
+  /// {@link Worker.OpChainComplete} messages followed by a final {@link 
Worker.ServerDone}.
+  ///
+  /// This skeleton wires up the gRPC mechanics + plan submission via the 
existing submission path. It does NOT yet
+  /// emit OpChainComplete / ServerDone — those need a per-opchain completion 
hook on
+  /// {@link org.apache.pinot.query.runtime.executor.OpChainSchedulerService}, 
which is layered on next.
+  /// Cancel still routes through the existing unary {@link 
#cancel(Worker.CancelRequest, StreamObserver)} RPC; broker
+  /// stream-close also triggers a cancel here.
+  @Override
+  public StreamObserver<Worker.BrokerToServer> submitWithStream(
+      StreamObserver<Worker.ServerToBroker> responseObserver) {
+    return new SubmitWithStreamObserver(responseObserver);
+  }
+
+  /// Per-query state for an open {@code SubmitWithStream} call. Owns the 
response stream and serialises every
+  /// {@code onNext} call on it via a {@code synchronized} block — gRPC 
requires {@code StreamObserver.onNext} to be
+  /// called serially.
+  ///
+  /// Tracks the expected number of opchains for the request (sum of 
WorkerMetadata across all stages). An
+  /// {@link OpChainCompletionListener} registered with {@link 
QueryRunner#registerOpChainCompletionListener}
+  /// fires once per opchain finishing, encodes its stats via {@link 
MultiStageStatsTreeEncoder}, and emits an
+  /// {@link Worker.OpChainComplete} on the response stream. When the 
per-request completed-count reaches the
+  /// expected total, {@link Worker.ServerDone} is emitted and the stream is 
closed.
+  ///
+  /// All blocking work (plan deserialization, opchain construction) runs on
+  /// {@link QueryServer#_submissionExecutorService}.
+  private final class SubmitWithStreamObserver implements 
StreamObserver<Worker.BrokerToServer> {
+    private final StreamObserver<Worker.ServerToBroker> _responseObserver;
+    /// Serialises onNext calls on the response stream and guards mutable 
session state.
+    private final Object _streamLock = new Object();
+    /// True once we've received the first {@code submit} and dispatched it.
+    private final AtomicBoolean _submitted = new AtomicBoolean(false);
+    /// True once we've completed the response stream (success or error). 
Idempotent guard.
+    private final AtomicBoolean _completed = new AtomicBoolean(false);
+    /// Number of opchains we expect to report for this request — set after we 
deserialize the plan.
+    private final AtomicInteger _expectedOpChains = new AtomicInteger(-1);
+    /// Number of opchains that have reported so far via the completion 
listener.
+    private final AtomicInteger _completedOpChains = new AtomicInteger(0);
+    /// Set once we successfully parse the request id from the submit 
metadata. Used by cancel-via-stream.
+    private volatile long _requestId = -1;
+
+    SubmitWithStreamObserver(StreamObserver<Worker.ServerToBroker> 
responseObserver) {
+      _responseObserver = responseObserver;
+    }
+
+    @Override
+    public void onNext(Worker.BrokerToServer message) {
+      switch (message.getPayloadCase()) {
+        case SUBMIT:
+          handleSubmit(message.getSubmit());
+          break;
+        case CANCEL:
+          handleCancel(message.getCancel());
+          break;
+        case PAYLOAD_NOT_SET:
+        default:
+          sendErrorAndComplete("Unexpected BrokerToServer payload: " + 
message.getPayloadCase());
+          break;
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      // Broker-side stream error / disconnect. Treat like a cancel and clean 
up; do not reply on the response stream
+      // (the underlying transport is gone).
+      LOGGER.warn("SubmitWithStream stream error for request {}: {}", 
_requestId, t.getMessage());
+      _completed.set(true);
+      cleanupListener();
+      cancelIfSubmitted();
+    }
+
+    @Override
+    public void onCompleted() {
+      // Broker has half-closed (no more inbound messages). The server stream 
stays open until all opchains have
+      // reported via the completion listener — it's the listener's job to 
emit ServerDone and complete the stream.
+      // If the broker half-closes before the server is done, that's OK; we 
keep emitting on the response stream
+      // until our own completion criterion is met.
+    }
+
+    private void handleSubmit(Worker.QueryRequest request) {
+      if (!_submitted.compareAndSet(false, true)) {
+        sendErrorAndComplete("Multiple submit messages on the same stream are 
not allowed");
+        return;
+      }
+      ServerMetrics.get().addMeteredGlobalValue(ServerMeter.MSE_QUERIES, 1L);
+      Map<String, String> deserializedMetadata;
+      try {
+        deserializedMetadata = 
QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while deserializing request metadata", 
e);
+        sendErrorAndComplete("Caught exception while deserializing request 
metadata: " + e.getMessage());
+        return;
+      }
+      // Override the cluster-level _sendStats decision for this request: 
stats travel out-of-band on the bidi stream
+      // (via the OpChainCompletionListener), so we suppress the mailbox-side 
stats path. The override is read by
+      // QueryRunner.effectiveSendStats(...).
+      Map<String, String> reqMetadata = new HashMap<>(deserializedMetadata);
+      
reqMetadata.put(CommonConstants.MultiStageQueryRunner.KEY_OF_STATS_REPORTING_MODE,
+          CommonConstants.MultiStageQueryRunner.STATS_REPORTING_MODE_STREAM);
+      try {
+        _requestId = Long.parseLong(reqMetadata.get(MetadataKeys.REQUEST_ID));
+      } catch (Exception ignored) {
+        // _requestId stays at -1; cancel-on-stream-close will just be a no-op.
+      }
+      // Count how many opchains will run on this server: sum of 
WorkerMetadata across all stage plans.
+      int opChainCount = 0;
+      for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+        opChainCount += stagePlan.getStageMetadata().getWorkerMetadataCount();
+      }
+      final int expected = opChainCount;
+      _expectedOpChains.set(expected);
+
+      // Register the per-request completion listener BEFORE submitting. 
Otherwise short opchains could finish before
+      // we've registered and we'd miss their events.
+      if (_requestId >= 0 && expected > 0) {
+        _queryRunner.registerOpChainCompletionListener(_requestId, 
this::onOpChainComplete);
+      }
+
+      long timeoutMs = 
Long.parseLong(reqMetadata.get(QueryOptionKey.TIMEOUT_MS));
+      CompletableFuture.runAsync(() -> submitInternal(request, reqMetadata), 
_submissionExecutorService)
+          .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+          .whenComplete((result, error) -> {
+            if (error != null) {
+              LOGGER.error("Caught exception while submitting request: {}", 
_requestId, error);
+              sendSubmitAck(buildErrorResponse("Caught exception while 
submitting request: " + error.getMessage()));
+              // Submission failed — no opchains will run, so emit ServerDone 
immediately and clean up.
+              cleanupListener();
+              sendDoneAndComplete();
+            } else {
+              sendSubmitAck(buildOkResponse());
+              // If for some reason expected was 0 (empty plan), close the 
stream now.
+              if (expected == 0) {
+                cleanupListener();
+                sendDoneAndComplete();
+              }
+            }
+          });
+    }
+
+    /**
+     * Fires once per opchain on this server completing. Encodes the stats 
into a {@link Worker.MultiStageStatsTree},
+     * emits an {@link Worker.OpChainComplete}, and emits {@link 
Worker.ServerDone} once all expected opchains have
+     * reported.
+     */
+    private void onOpChainComplete(OpChainId opChainId, MultiStageOperator 
rootOperator,
+        @Nullable MultiStageQueryStats stats, OpChainExecutionContext context, 
@Nullable Throwable error) {
+      Worker.OpChainComplete.Builder builder = 
Worker.OpChainComplete.newBuilder()
+          .setStageId(opChainId.getStageId())
+          .setWorkerId(opChainId.getVirtualServerId())
+          .setSuccess(error == null);
+      if (error != null) {
+        builder.setErrorMsg(error.getMessage() == null ? 
error.getClass().getSimpleName() : error.getMessage());
+      }
+      if (stats != null) {
+        try {
+          builder.setStats(MultiStageStatsTreeEncoder.encode(rootOperator, 
stats, context));

Review Comment:
   Two concerns here:
   
   1. **Worse error-path coverage than legacy.** When the encoder throws (most 
likely on `treeSize != flatSize` from an opchain that failed before emitting 
EOS), this emits `success=false` with empty `stats`. Stream mode also 
suppresses mailbox stats (see `effectiveSendStats`), so for this opchain the 
broker now has strictly less info than legacy mode would deliver via mailbox 
EOS or the cancel-RPC fallback. Worth either:
      - documenting this as a behavior change in the PR description, or
      - falling back to a best-effort flat encoder that emits whatever entries 
the flat list has, even when the tree shape doesn't align.
   2. **Encode + `onNext` under `_streamLock` on the opchain executor thread, 
with `NettyServerBuilder.directExecutor()` set at class init** (line 217) — 
inbound handlers also run on the same loop. Wide trees stall the loop both 
directions. Suggest moving encode + send to a small bounded worker pool.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/streaming/StreamingDispatchObserver.java:
##########
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.streaming;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Broker-side gRPC client {@code StreamObserver} for the {@code 
SubmitWithStream} bidi RPC. Routes inbound
+ * {@link Worker.ServerToBroker} messages to either the {@code submit_ack} 
callback (first message) or the
+ * {@link StreamingQuerySession} (subsequent {@code OpChainComplete}s), and 
adapts the inbound side of the stream
+ * (the {@link StreamObserver} we use to send {@code BrokerToServer} messages) 
into a
+ * {@link StreamingServerHandle} so the session can fan out cancel.
+ *
+ * <p>Lifecycle — created when the broker opens a {@code SubmitWithStream} 
call to one server, then registered with
+ * the session via {@link StreamingQuerySession#registerStream}. Receives:
+ * <ol>
+ *   <li>Exactly one {@code submit_ack} (always the first server→broker 
message).</li>
+ *   <li>Zero or more {@code OpChainComplete} messages (one per opchain that 
ran on this server).</li>
+ *   <li>Exactly one {@code ServerDone} after the last opchain has 
reported.</li>
+ * </ol>
+ *
+ * <p>On {@code onError} or unexpected message order, drains the latch by 
{@code remainingExpected} via the session
+ * so {@link StreamingQuerySession#awaitCompletion} can still finalize.
+ */
+public class StreamingDispatchObserver
+    implements StreamObserver<Worker.ServerToBroker>, StreamingServerHandle {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamingDispatchObserver.class);
+
+  private final QueryServerInstance _server;
+  private final StreamingQuerySession _session;
+  /// Receives the submit-ack response or a failure throwable. Called exactly 
once per call (either with response on
+  /// the first ServerToBroker.submit_ack, or with an error if the stream 
breaks before the ack arrives).
+  private final BiConsumer<Worker.QueryResponse, Throwable> _ackCallback;
+  private final int _expectedOpChainsForThisServer;
+  private final AtomicBoolean _ackReceived = new AtomicBoolean(false);
+
+  /**
+   * Counts how many opchains we've already drained from the session for this 
server, so an onError doesn't
+   * double-drain after some opchains already reported successfully.
+   *
+   * <p>Accessed only from gRPC inbound callbacks ({@code onNext}, {@code 
onError}, {@code onCompleted}), which gRPC
+   * serializes per stream — no additional synchronization is needed.
+   */
+  private int _opChainsReportedForThisServer = 0;
+
+  /**
+   * The inbound side of the bidi stream — used to send {@code submit} 
(initial) and {@code cancel} (fan-out) from
+   * the broker. Set once via {@link #attachOutboundStream} after the gRPC 
stub is asked to start the stream; lives
+   * for the duration of the call.
+   */
+  private volatile StreamObserver<Worker.BrokerToServer> _outbound;
+
+  public StreamingDispatchObserver(QueryServerInstance server, 
StreamingQuerySession session,
+      int expectedOpChainsForThisServer, BiConsumer<Worker.QueryResponse, 
Throwable> ackCallback) {
+    _server = server;
+    _session = session;
+    _expectedOpChainsForThisServer = expectedOpChainsForThisServer;
+    _ackCallback = ackCallback;
+  }
+
+  public QueryServerInstance getServer() {
+    return _server;
+  }
+
+  /** Wires the outbound side of the bidi stream once the gRPC stub returns 
it. */
+  public void attachOutboundStream(StreamObserver<Worker.BrokerToServer> 
outbound) {
+    _outbound = outbound;
+  }
+
+  /** Sends the initial {@code BrokerToServer.submit} message on the outbound 
side. */
+  public void sendSubmit(Worker.QueryRequest request) {
+    StreamObserver<Worker.BrokerToServer> outbound = _outbound;
+    if (outbound == null) {
+      throw new IllegalStateException("attachOutboundStream must be called 
before sendSubmit");
+    }
+    
outbound.onNext(Worker.BrokerToServer.newBuilder().setSubmit(request).build());
+  }
+
+  @Override
+  public void onNext(Worker.ServerToBroker message) {
+    switch (message.getPayloadCase()) {
+      case SUBMIT_ACK:
+        if (_ackReceived.compareAndSet(false, true)) {
+          _ackCallback.accept(message.getSubmitAck(), null);
+        } else {
+          LOGGER.warn("Ignoring duplicate submit_ack from {}", _server);
+        }
+        break;
+      case OPCHAIN:
+        _session.recordOpChainComplete(message.getOpchain());

Review Comment:
   This synchronously kicks decode + merge on the gRPC client event loop via 
`_session.recordOpChainComplete`. See companion concern on 
`StreamingQuerySession#recordOpChainComplete` — at high QPS, expensive 
per-event work on the shared client event loop causes cross-query HOL blocking.
   
   Aside: `_opChainsReportedForThisServer` is a plain `int`. The class Javadoc 
correctly notes gRPC serializes inbound callbacks per stream, but `cancel()` on 
this same instance is called from a *different* I/O thread (via 
`fanOutCancel`'s iteration of `_openStreams`). Today that's safe because 
`cancel()` only touches `_outbound` (which is `volatile`), but the threading 
boundary is subtle — a short note on what runs on which thread would help 
future maintainers.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -189,10 +191,189 @@ public QueryResult submitAndReduce(RequestContext 
context, DispatchableSubPlan d
     }
   }
 
-  /// Tries to recover from an exception thrown during query dispatching.
+  /// Streaming variant of {@link #submitAndReduce}: opens one {@code 
SubmitWithStream} bidi RPC per server, runs the
+  /// broker's stage 0 reducer, and once the receiving mailbox finishes awaits 
the per-stage stats with early
+  /// completion (returns as soon as every expected opchain has reported, or 
when the wait window fires — whichever
+  /// happens first). Stats from the session accumulator are then merged into 
the broker's local stage 0 stats to
+  /// build the final {@link QueryResult}.
+  ///
+  /// The wait window is bounded by the query's remaining timeout: if {@code 
submitWithStream + runReducer} consumed
+  /// most of the budget, the per-stage stats may end up partial (visible via 
the per-stage {@code mergeFailed} /
+  /// {@code missing} counts the session exposes).
+  ///
+  /// Cancel is handled via {@link StreamingQuerySession#fanOutCancel()} — no 
unary Cancel RPCs are issued for this
+  /// query path. On any error, fan-out cancel is broadcast over the open 
streams, then the broker waits for remaining
+  /// stats before building the final result.
+  ///
+  /// <b>Mixed-version policy.</b> No automatic fallback to the unary {@link 
#submit} path. Enabling
+  /// {@link CommonConstants.Broker.Request.QueryOptionKey#STREAM_STATS} 
requires every server in the
+  /// cluster to implement {@code SubmitWithStream}; if any server returns 
{@code UNIMPLEMENTED} or any other
+  /// transport error during dispatch, {@link #submitWithStream} surfaces the 
throwable through the ack queue,
+  /// {@link #processResults} throws, and this method fans out cancel via the 
session before propagating the failure.
+  private QueryResult submitAndReduceWithStream(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan,
+      long timeoutMs, Map<String, String> queryOptions)
+      throws Exception {
+    long requestId = context.getRequestId();
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    Set<QueryServerInstance> servers = new HashSet<>();
+
+    // The session's expected-opchain count must equal the total number of 
opchains across every (server, non-root
+    // stage) pair — that's how many OpChainComplete messages we expect to 
receive.
+    Set<DispatchablePlanFragment> stagePlansWithoutRoot = 
dispatchableSubPlan.getQueryStagesWithoutRoot();
+    int totalExpected = 0;
+    Map<Integer, Integer> expectedByStage = new HashMap<>();
+    for (DispatchablePlanFragment stagePlan : stagePlansWithoutRoot) {
+      int stageId = stagePlan.getPlanFragment().getFragmentId();
+      int stageCount = 0;
+      for (List<Integer> workerIds : 
stagePlan.getServerInstanceToWorkerIdMap().values()) {
+        stageCount += workerIds.size();
+      }
+      totalExpected += stageCount;
+      expectedByStage.put(stageId, stageCount);
+    }
+    StreamingQuerySession session = new StreamingQuerySession(requestId, 
totalExpected);
+
+    try {
+      submitWithStream(requestId, dispatchableSubPlan, timeoutMs, servers, 
queryOptions, session);
+      QueryResult brokerResult = runReducer(dispatchableSubPlan, queryOptions, 
_mailboxService);
+
+      // Receiving mailbox finished. Wait for stats: returns true as soon as 
every opchain has reported, or false
+      // when the timeout fires.
+      long remainingMs = Math.max(0, deadlineMs - System.currentTimeMillis());
+      boolean fullCoverage = session.awaitCompletion(remainingMs, 
TimeUnit.MILLISECONDS);

Review Comment:
   **No wait-window cap (latency regression).** This uses the full remaining 
query timeout, but the design doc on #18375 specified `min(50ms, 
remainingTimeout)` as a hard cap. As written, a single stuck opchain on any 
server holds the broker response back until the entire query budget elapses — 
e.g. a 10s-timeout query that has data in hand at t=100ms but is missing one 
OpChainComplete will return to the client at t=10000ms instead of ~t=150ms.
   
   For p99-sensitive workloads this is a material regression vs legacy mode. 
Suggest a configurable cap (`min(STATS_WAIT_CAP_MS, remainingMs)`) with a 
50–100ms default. Same issue applies in `tryRecoverWithStream` below.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -655,10 +661,18 @@ private BrokerResponse 
query(QueryEnvironment.CompiledQuery query, long requestI
       _stagesStartedMeter.mark(stageCount);
       _opchainsStartedMeter.mark(opChainCount);
 
+      // Inject the cluster-default stream-stats mode unless the query already 
overrides it.
+      Map<String, String> effectiveOptions = query.getOptions();
+      if (_streamStats && !effectiveOptions.containsKey(
+          CommonConstants.Broker.Request.QueryOptionKey.STREAM_STATS)) {
+        effectiveOptions = new HashMap<>(effectiveOptions);

Review Comment:
   Allocates a fresh `HashMap` on every query whenever cluster default is on. 
At high QPS that's one extra alloc + `put` per query for what is effectively a 
constant. Cleaner: thread the cluster default through `compileQuery` / 
`getQueryEnvConf` so `queryOptions` carries it once, and drop this branch.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageStatsTreeDecoder.java:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import com.google.protobuf.ByteString;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.runtime.operator.OperatorTypeDescriptor;
+import org.apache.pinot.query.runtime.operator.OperatorTypeRegistry;
+
+
+/**
+ * Broker-side decoder turning {@link Worker.MultiStageStatsTree} payloads 
into {@link StageStatsTreeNode} instances
+ * the broker accumulates by stage id (and merges across worker reports for 
the same stage).
+ *
+ * <p>Pairs with {@link MultiStageStatsTreeEncoder} on the server side.
+ */
+public final class MultiStageStatsTreeDecoder {
+  private MultiStageStatsTreeDecoder() {
+  }
+
+  /**
+   * Thrown when a payload cannot be decoded — usually because the operator 
type id is unknown to this broker
+   * (newer-server / older-broker case). The broker logs and marks the stage 
{@code mergeFailed}; the query continues.
+   */
+  public static class DecodeFailedException extends Exception {
+    public DecodeFailedException(String message) {
+      super(message);
+    }
+
+    public DecodeFailedException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Decodes a single {@link Worker.StageStatsNode} (recursive). Used directly 
when the caller already has a node and
+   * a known stage id.
+   */
+  public static StageStatsTreeNode decodeNode(Worker.StageStatsNode node)
+      throws DecodeFailedException {
+    OperatorTypeDescriptor type = 
OperatorTypeRegistry.fromId(node.getOperatorTypeId());
+    if (type == null) {
+      throw new DecodeFailedException("Unknown operator type id: " + 
node.getOperatorTypeId());
+    }
+    StatMap<?> statMap;
+    try {
+      statMap = deserializeStatMap(node.getStatMap(), type);
+    } catch (IOException e) {
+      throw new DecodeFailedException("Failed to deserialize StatMap for 
operator type " + type.name(), e);
+    }
+    List<StageStatsTreeNode> children = new 
ArrayList<>(node.getChildrenCount());

Review Comment:
   Two concerns about this recursive walk:
   
   1. **Unbounded stack depth.** Driven by the proto's `children` list. A 
pathological / malformed payload (or a future bug-injected loop) can overflow 
the Netty event loop stack. Add an explicit depth cap (e.g. 
`MAX_OPERATOR_TREE_DEPTH = 64`) and short-circuit to `DecodeFailedException`.
   2. **Per-query broker memory.** Each opchain report keeps a 
fully-deserialized `StageStatsTreeNode` graph alive on the broker for the 
query's wait window. Aggregate broker footprint becomes ~`O(numQueries × 
numServers × operatorsPerStage)` — strictly larger than legacy's compact 
byte-merge buffer. Worth a heap-snapshot comparison vs legacy under 
representative load before defaulting on.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/streaming/StreamingQuerySession.java:
##########
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.service.dispatch.streaming;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.runtime.plan.MultiStageStatsTreeDecoder;
+import org.apache.pinot.query.runtime.plan.StageStatsTreeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Broker-side per-query state for the {@code SubmitWithStream} dispatch path. 
Owns the per-stage tree accumulator,
+ * the outstanding-opchain count, the per-stage coverage counters, and the set 
of open server streams.
+ *
+ * <p>Concurrency model — all mutating methods acquire the per-session lock, 
so the accumulator and counters need no
+ * additional internal synchronization. gRPC client {@code onNext} callbacks 
land on I/O threads and call into this
+ * session directly; the work per call is short (decode + merge + decrement) 
so doing it on the I/O thread is fine.
+ *
+ * <p>Completion semantics — {@link #awaitCompletion(long, TimeUnit)} returns 
{@code true} as soon as every expected
+ * opchain has reported (early completion), and {@code false} if the timeout 
fires first. The dispatcher should call
+ * it <strong>only after</strong> the broker receiving mailbox has finished, 
so that a successful return means both
+ * "data done" and "stats fully accounted for". When it returns {@code false} 
the per-stage coverage exposes which
+ * stages are missing.
+ */
+public class StreamingQuerySession {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamingQuerySession.class);
+
+  private final long _requestId;
+  private final int _expectedOpChains;
+  private final CountDownLatch _completionLatch;
+  private final ReentrantLock _lock = new ReentrantLock();
+
+  /** Per-stage merged accumulator. Mutated under {@link #_lock}. */
+  private final Map<Integer, StageStatsTreeNode> _stageAccumulator = new 
HashMap<>();
+  /** Per-stage count of opchains that have responded successfully and merged 
cleanly. */
+  private final Map<Integer, Integer> _respondedByStage = new HashMap<>();
+  /** Per-stage count of opchains that responded but the broker couldn't merge 
their payload. */
+  private final Map<Integer, Integer> _mergeFailedByStage = new HashMap<>();
+
+  /** Set of open server streams. Iteration order is insertion order so cancel 
fan-out is deterministic. */
+  private final Set<StreamingServerHandle> _openStreams = new 
LinkedHashSet<>();
+
+  /** True after the first peer error (success=false OpChainComplete or stream 
onError). Used to trigger fan-out
+   * cancel idempotently. */
+  private boolean _peerErrorObserved = false;
+
+  public StreamingQuerySession(long requestId, int expectedOpChains) {
+    _requestId = requestId;
+    _expectedOpChains = expectedOpChains;
+    _completionLatch = new CountDownLatch(expectedOpChains);
+  }
+
+  public long getRequestId() {
+    return _requestId;
+  }
+
+  public int getExpectedOpChains() {
+    return _expectedOpChains;
+  }
+
+  /**
+   * Registers an open server stream so the session can iterate them later for 
fan-out cancel. Must be called by the
+   * dispatcher when the {@code SubmitWithStream} call is opened.
+   */
+  public void registerStream(StreamingServerHandle stream) {
+    _lock.lock();
+    try {
+      _openStreams.add(stream);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Removes a stream from the open-streams set. Called when the server emits 
{@code ServerDone} (clean close) or the
+   * stream errors. Idempotent.
+   */
+  public void unregisterStream(StreamingServerHandle stream) {
+    _lock.lock();
+    try {
+      _openStreams.remove(stream);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  /**
+   * Records an {@link Worker.OpChainComplete} message decoded from a server 
stream. Decrements the outstanding count
+   * and merges the contained tree into the per-stage accumulator (or marks 
the stage {@code mergeFailed} on a shape
+   * mismatch / decode failure). Also records {@code success=false} reports as 
peer errors so fan-out cancel can fire.
+   */
+  public void recordOpChainComplete(Worker.OpChainComplete message) {
+    int stageId = message.getStageId();
+    boolean isSuccess = message.getSuccess();
+    Worker.MultiStageStatsTree statsTree = message.getStats();
+
+    boolean shouldFanOutCancel = false;
+    _lock.lock();
+    try {
+      if (!isSuccess) {
+        if (!_peerErrorObserved) {
+          _peerErrorObserved = true;
+          shouldFanOutCancel = true;

Review Comment:
   **Cancel fan-out is undebounced.** First `success=false` or transport-level 
error immediately broadcasts cancel to all N-1 peer servers. At scale this 
amplifies in two ways:
   - A flaky server returning errors on a small fraction of queries now 
triggers N-1 cancel sends per affected query — at high QPS during a partial 
degradation this is a meaningful self-inflicted cancel storm.
   - `recordStreamError` also triggers fan-out on *any* `onError` (broker GC 
pause beyond keep-alive, idle timeout, network reset). Premature-cancel 
probability grows with `N × queryDuration`.
   
   Consider:
   - Debounce (cancel only after K errors / K ms).
   - Distinguish "server returned error" from "stream transport closed" — 
transport blips shouldn't kill peers.
   - Emit a `cancelFanoutsBroadcast{reason}` metric so this is observable in 
prod.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java:
##########
@@ -63,6 +67,25 @@ public class OpChainExecutionContext {
   private ServerPlanRequestContext _leafStageContext;
   private final boolean _sendStats;
   private final boolean _keepPipelineBreakerStats;
+  /**
+   * Map of MultiStageOperator -> PlanNodes that compile down to that 
operator. Populated by
+   * {@link org.apache.pinot.query.runtime.plan.PlanNodeToOpChain} during 
opchain construction. Cardinality is
+   * one-to-many: an intermediate operator maps to a single PlanNode, but the 
leaf operator maps to the whole sub-tree
+   * of v1 plan nodes below the leaf-stage boundary. Used by the stream-mode 
stats reporting path
+   * ({@code MultiStageStatsTreeEncoder}) to attach plan-node identifiers to 
each operator's stats.
+   * <p>
+   * Identity-based (IdentityHashMap) because PlanNode equality is structural 
and two distinct nodes can compare equal.
+   */
+  private final Map<MultiStageOperator, List<PlanNode>> _operatorToPlanNodes = 
new IdentityHashMap<>();

Review Comment:
   `_operatorToPlanNodes` and `_planNodeIds` are allocated and populated for 
*every* opchain via `PlanNodeToOpChain.assignPlanNodeIds` + `record`, 
regardless of whether stats reporting will use them. In legacy mode (which 
remains the default) these maps are never read — pure GC dead weight on the hot 
path.
   
   At high QPS × deep stages this adds up. Suggest gating the puts in 
`PlanNodeToOpChain` behind `context.isSendStats() == false` (i.e. stream mode 
active per `KEY_OF_STATS_REPORTING_MODE`), or making these maps lazy.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -207,19 +388,48 @@ private QueryResult tryRecover(long requestId, 
Set<QueryServerInstance> servers,
     } else if (ex instanceof QueryException) {
       errorCode = ((QueryException) ex).getErrorCode();
     } else {
-      // in case of unknown exceptions, the exception will be rethrown, so we 
don't need stats
       cancel(requestId, servers);
       throw ex;
     }
-    // in case of known exceptions (timeout or query exception), we need can 
build here the erroneous QueryResult
-    // that include the stats.
-    LOGGER.warn("Query failed with a known exception. Trying to cancel the 
other opchains");
-    MultiStageQueryStats stats = cancelWithStats(requestId, servers);
-    if (stats == null) {
+    LOGGER.warn("Query failed with a known exception. Cancelling remaining 
opchains.");
+    cancel(requestId, servers);
+    QueryProcessingException processingException = new 
QueryProcessingException(errorCode, ex.getMessage());
+    return new QueryResult(processingException, 
MultiStageQueryStats.emptyStats(0), 0L);
+  }
+
+  /// Tries to recover from an exception thrown during stream-mode ({@code 
SubmitWithStream}) query dispatching.
+  ///
+  /// Fans out cancel over the open streams, waits briefly for any remaining 
{@code OpChainComplete} messages (up to
+  /// the query deadline), and builds a {@link QueryResult} that includes 
whatever stats arrived before the deadline.
+  /// Stats from before the error are available because servers push {@code 
OpChainComplete} even on failure.
+  ///
+  /// Unknown exceptions (not {@link TimeoutException} or {@link 
QueryException}) are re-thrown after cancel fan-out.
+  private QueryResult tryRecoverWithStream(StreamingQuerySession session, 
Map<Integer, Integer> expectedByStage,
+      long deadlineMs, Exception ex)
+      throws Exception {
+    if (ex instanceof ExecutionException && ex.getCause() instanceof 
Exception) {
+      ex = (Exception) ex.getCause();
+    }
+    QueryErrorCode errorCode;
+    if (ex instanceof TimeoutException) {
+      errorCode = QueryErrorCode.EXECUTION_TIMEOUT;
+    } else if (ex instanceof QueryException) {
+      errorCode = ((QueryException) ex).getErrorCode();
+    } else {
+      session.fanOutCancel();
       throw ex;
     }
+    LOGGER.warn("Stream-mode query failed with a known exception. Fanning out 
cancel and waiting for stats.");
+    session.fanOutCancel();
+    long remainingMs = Math.max(0, deadlineMs - System.currentTimeMillis());
+    try {
+      session.awaitCompletion(remainingMs, TimeUnit.MILLISECONDS);

Review Comment:
   **Error path also waits full remaining timeout** — same root issue as the 
success-path comment up the file. A query that hits an error at t=100ms with a 
10s timeout doesn't return to the client until t=10000ms, vs legacy returning 
immediately on cancel.
   
   For the error path specifically, blocking the client just to collect more 
stats is the wrong trade — you've already given up on the query. Use a much 
smaller cap here (e.g. 100–200ms) and represent the rest via 
`streamStatsCoverage[].missing > 0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to