gortiz opened a new issue, #18541:
URL: https://github.com/apache/pinot/issues/18541

   ## Problem
   
   The multi-stage engine's mailbox cancel propagation runs **in-band**: 
`GrpcSendingMailbox.cancel(Throwable)` pushes an error EOS message through the 
same gRPC stream as data, using the `bypassReady=true` path. On the receiver, 
that EOS is delivered through `MailboxContentObserver.onNext`, which calls into 
`ReceivingMailbox.offerRaw`. The application queue has a default capacity of 5 
(`ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS`), and the dispatch thread parks 
on `_notFull.await` when full.
   
   When that dispatch thread is parked — e.g. the consumer is slow or has gone 
away entirely — the in-band EOS sits behind every other inbound message that 
already made it past gRPC's flow-control window. Worst-case cancel latency is 
bounded by `min(credit messages, flowControlWindow bytes / msg_size)` worth of 
buffered inbound that has to drain before the EOS reaches the application.
   
   With the manual flow control introduced in #18519 at default credit=128, up 
to `min(128, flowControlWindow / msg_size)` messages can be buffered ahead of 
the EOS, widening worst-case cancel latency relative to gRPC's auto-inbound 
default of 1 in-flight message.
   
   ## This hang is not new
   
   Pre-#18519, with auto-inbound flow control, at most 1 message was buffered 
ahead of the EOS, so the worst-case latency was smaller — but the *underlying* 
mechanism (in-band EOS gated by a parked dispatch thread on a full application 
queue) existed before this PR and continues to exist on the rollback path 
(`pinot.query.runner.grpc.manual.inbound.flow.control.enabled=false`).
   
   The wider credit window **magnifies an existing surface**; it does not 
create a new failure mode. The proper fix is an out-of-band cancel channel.
   
   ## Two design options
   
   ### Option 1 — `setOnCancelHandler` + sender `stream.cancel()`
   
   Register `ServerCallStreamObserver.setOnCancelHandler(...)` on the receiver 
to call `ReceivingMailbox.cancel(...)`. Sender's 
`GrpcSendingMailbox.cancel(...)` calls `_contentObserver.cancel(message, 
cause)` after the in-band EOS attempt.
   
   * **Pros.** Tiny change. The handler fires immediately, independent of 
buffered inbound. ~50 lines + tests + an idempotency check on 
`ReceivingMailbox.cancel()`.
   * **Cons.** gRPC's stream `cancel` can abort delivery of in-flight messages, 
so the in-band error EOS — which carries the specific Pinot `QueryErrorCode` 
(today `QUERY_CANCELLATION` or whatever the upstream cause was) — may **not** 
arrive at the receiver. The receiver mailbox sees a generic "client cancelled" 
via the cancel handler instead of the specific code. Acceptable as a quick fix; 
not ideal if precise error-code propagation matters.
   
   ### Option 2 (preferred) — dedicated cancel RPC
   
   Add a new RPC method to `Mailbox.proto`:
   
   ```proto
   service PinotMailbox {
     rpc Open(stream MailboxContent) returns (stream MailboxStatus);
     rpc Cancel(MailboxCancel) returns (CancelAck);  // <-- new
   }
   
   message MailboxCancel {
     string mailbox_id = 1;
     bytes error_payload = 2;  // serialised ErrorMseBlock with the original 
QueryErrorCode
   }
   
   message CancelAck {}
   ```
   
   Sender's `GrpcSendingMailbox.cancel(...)` invokes this unary RPC (instead 
of, or in addition to, the in-band EOS) carrying the mailbox id and the 
serialised error block. Receiver's `GrpcMailboxServer` looks up the mailbox by 
id (the existing `MailboxService.getReceivingMailbox` lookup) and calls 
`mailbox.cancel(deserialiseErrorBlock(payload))` directly.
   
   * **Pros.** Out-of-band, no stream involvement, no risk of dropped in-flight 
messages, full error-code fidelity. Independent of the inbound credit value 
entirely.
   * **Cons.** Bigger change: proto change + new RPC handler + sender-side 
wiring + mixed-version handling (the rollback path is: old receiver doesn't 
expose `Cancel`; sender falls back to the in-band EOS path). Wire compatibility 
needs a feature flag for at least one release cycle.
   
   ## Suggested test
   
   A targeted unit/integration test alongside whichever option is implemented:
   
   1. Stand up a sender + receiver pair (similar to 
`GrpcSenderBackpressureTest`).
   2. Register a "do nothing" reader on the receiver so the application queue 
fills and the dispatch thread parks on `_notFull.await`.
   3. Send blocks until the sender is back-pressured.
   4. Call `sender.cancel(new RuntimeException("test cancel"))`.
   5. Assert that the receiving mailbox observes the cancel (e.g. 
`receiver.poll()` returns an error EOS, or `isCancelled()` flips) within a 
tight bound — **~100 ms**.
   
   This test would fail today (cancel would hang behind the buffered inbound) 
and pass after either Option 1 or Option 2.
   
   ## References
   
   * PR #18519 — introduced manual inbound flow control with default credit=128.
   * Review comment id `3269229796` (Yash) — flagged this regression.
   * Rollback knob: 
`pinot.query.runner.grpc.manual.inbound.flow.control.enabled=false` reverts to 
auto-inbound (1 in-flight message). It bounds worst-case cancel latency more 
tightly but does **not** fix the underlying mechanism.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to