gortiz opened a new pull request, #18458: URL: https://github.com/apache/pinot/pull/18458
## Summary Implements the proposal from #18375: a new opt-in mode where servers push per-opchain stats directly to the broker over a long-lived bidi gRPC stream, instead of piggy-backing them onto the mailbox just before EOS. Resolves #18375. ### What the issue proposed vs. what is implemented The implementation follows the issue's proposal closely, with a few clarifications: | Area | Issue proposal | This PR | |---|---|---| | New RPC | `SubmitWithStream(stream BrokerToServer) returns (stream ServerToBroker)` | ✅ Implemented as specified | | `BrokerToServer` | `submit` + `cancel` payloads | ✅ Both implemented; `cancel` routes to `QueryRunner.cancel()` and also fires on stream-close | | `ServerToBroker` | `submit_ack` + `OpChainComplete` + `ServerDone` | ✅ Implemented as specified | | `MultiStageStatsTree` / `StageStatsNode` | Structured tree-shaped payload with operator type, plan-node ids, stat bytes, children | ✅ Implemented; encoder walks the live operator tree + flat `MultiStageQueryStats` lists; decoder produces a `StageStatsTreeNode` accumulator on the broker | | Op→PlanNode mapping | Captured via existing `BiConsumer` tracker in `PlanNodeToOpChain`; leaf sub-tree walked | ✅ Implemented on `OpChainExecutionContext` | | Stats in mailbox | Suppressed when stream mode is active | ✅ Server sends EOS without stats when `SubmitWithStream` is in use | | Broker accumulator | Per-stage `Map<Integer, StageStatsNode>` with `StatMap.merge` per node; tree-shape mismatch → `mergeFailed` | ✅ `StreamingQuerySession` owns the accumulator and per-stage coverage counters | | Wait window | After data-mailbox EOS, broker waits up to `min(50 ms, remaining timeout)` for outstanding `OpChainComplete`s | ✅ Implemented in `submitAndReduceWithStream` | | Fan-out cancel | On first peer error, broker sends `BrokerToServer.cancel` on all open streams | ✅ `StreamingQuerySession.fanOutCancel()` | | Per-stage coverage | `responded` / `mergeFailed` / `missing` exposed in broker response as `streamStatsCoverage[]` | ✅ Exposed on `QueryResult` and broker JSON response | | Config | Cluster-level `pinot.broker.multistage.use.stream.stats.reporting` + per-query `useStreamStatsReporting` option | ✅ Implemented; default is `false` (legacy mode unchanged) | | Fallback | Broker detects `UNIMPLEMENTED` and falls back to legacy for that query | ✅ Implemented in `DispatchClient` | **Notable divergences / implementation decisions not in the issue:** - **`ServerDone` is implicit, not a separate message.** The server half-closes the outbound stream (calls `onCompleted()`) after the last `OpChainComplete` rather than sending a dedicated `ServerDone` message. This is equivalent and avoids an extra round-trip. - **Cancel via stream replaces the unary `Cancel` RPC for stream-mode queries (Phase B).** The separate `Cancel` RPC is kept alive for one release for backward compatibility but its response no longer carries stats. - **O(1) cancel in `OpChainSchedulerService` (all modes).** The previous `cancel()` scanned `_opChainCache` O(n). A new `_executionContextByRequest` map + `_activeOpChainsByRequest` reference-counter enable direct `QueryExecutionContext.terminate()` without a scan. The write lock is acquired *before* the `compute()` eviction to close the race window between context removal and cancelled-query cache write. - **`tryRecoverWithStream`** fans out cancel and then waits for remaining `OpChainComplete` stats up to the query deadline, so error-path stats are as complete as possible. - **N-ary set-op tree reconstruction is now correct.** The legacy `InStageStatsTreeBuilder` heuristic (implicit arity) was lossy for set ops with more than two inputs. The new `MultiStageStatsTree` format carries the tree shape explicitly, so the broker reconstructs it exactly regardless of arity. ### Key files | File | Role | |---|---| | `pinot-common/src/main/proto/worker.proto` | New RPC + `BrokerToServer` / `ServerToBroker` / `OpChainComplete` / `MultiStageStatsTree` / `StageStatsNode` messages | | `QueryServer.java` | `SubmitWithStream` handler: plan submission, opchain completion callbacks, cancel-via-stream, stream-close-as-cancel | | `MultiStageStatsTreeEncoder.java` | Server-side: walks live operator tree + `MultiStageQueryStats` flat lists → `MultiStageStatsTree` proto | | `MultiStageStatsTreeDecoder.java` | Broker-side: decodes proto → `StageStatsTreeNode` accumulator; detects shape mismatches | | `StreamingQuerySession.java` | Per-query accumulator: per-stage tree merge, coverage counters, completion latch, fan-out cancel | | `QueryDispatcher.java` | Opens `SubmitWithStream` streams; wait window; error recovery; stats coverage in response | | `OpChainSchedulerService.java` | O(1) cancel; completion listener registration for stream-mode stats push | | `OpChainExecutionContext.java` | Op→PlanNode map captured at construction time | | `StreamStatsReportingIntegrationTest.java` | Integration tests: simple aggregation, join (≥3 stages), three-way UNION (N-ary set op regression), cluster-level config activation | ### Test plan - [x] `StreamStatsReportingIntegrationTest` — 4 tests covering simple aggregation, join, three-way UNION (N-ary set-op regression), cluster-level config; all pass - [x] `MultiStageStatsTreeEncoderTest` / `MultiStageStatsTreeDecoderTest` — round-trip, merge, shape-mismatch handling - [x] `StreamingQuerySessionTest` — coverage counters, fan-out cancel, latch semantics - [x] `OpChainSchedulerServiceTest` — O(1) cancel, multi-opchain context cleanup, completion listener lifecycle - [x] `DispatchClientTest` / `StreamingDispatchObserverTest` — `SubmitWithStream` client-side observer, `UNIMPLEMENTED` fallback - [x] Legacy mode unchanged: all existing MSE integration tests unaffected (stream mode is opt-in, default off) 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
