gortiz opened a new pull request, #18519:
URL: https://github.com/apache/pinot/pull/18519
> **Draft.** This PR ships only the observability + reproducer that we need
to discuss the fix. The actual sender-side flow-control change is intentionally
**not** in this commit — see _What's missing_ below. Do not merge in this state.
## Why
Customer hit `OutOfDirectMemoryError` inside `MessageFramer.writeRaw` from
`GrpcSendingMailbox.sendContent`:
```
failed to allocate 4194304 byte(s) of direct memory (used: 25163727127, max:
25165824000)
at
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(...)
...
at io.grpc.internal.MessageFramer.writeRaw(MessageFramer.java:294)
...
at
org.apache.pinot.query.mailbox.GrpcSendingMailbox.sendContent(GrpcSendingMailbox.java:227)
```
Root cause: `GrpcSendingMailbox` calls `StreamObserver.onNext(...)`
unconditionally on every chunk. The runtime object is a
`ClientCallStreamObserver` exposing `isReady()` / `setOnReadyHandler()`, but we
don't use either. The receiver-side `MailboxStatusObserver` already writes back
buffer-size metadata to the sender, but it is explicitly thrown away:
```java
// pinot-query-runtime/.../channel/MailboxStatusObserver.java:53
// TODO: this feedback info is not used to throttle the send speed. it is
currently being discarded.
```
When the receiver drains slower than the sender writes (slow consumer, full
mailbox queue, large fan-out, skewed hash shuffle, etc.) the proto chunks pile
up in gRPC Netty's outbound queue until the JVM-wide direct-memory cap is hit.
## What this PR adds
### New gauges (`MAILBOX_CLIENT_USED_*`)
`BrokerGauge` and `ServerGauge` gain two entries each:
| Gauge | Source |
|---|---|
| `MAILBOX_CLIENT_USED_DIRECT_MEMORY` |
`ChannelManager._bufAllocator.metric().usedDirectMemory()` |
| `MAILBOX_CLIENT_USED_HEAP_MEMORY` |
`ChannelManager._bufAllocator.metric().usedHeapMemory()` |
These mirror the existing `MAILBOX_SERVER_USED_*` gauges that
`GrpcMailboxServer` already publishes for the inbound (server) allocator —
closing the gap that today we have **no per-process visibility into the gRPC
client outbound pool**, which is exactly the pool that exhausts in this failure
mode.
`MailboxService` registers them in its constructor, picking `BrokerMetrics`
vs `ServerMetrics` based on `InstanceType`, mirroring `GrpcMailboxServer`'s
existing pattern. Both direct and heap are reported so the numbers remain
meaningful regardless of `-Dio.netty.noPreferDirect`.
`MailboxService` also exposes the same values via four accessors
(`getMailbox{Client,Server}Used{Direct,Heap}MemoryBytes()`) so tests and other
internal callers can read the gauge values without going through the metrics
registry.
### `GrpcSenderBackpressureReproTest`
A new TestNG test in `pinot-query-runtime/src/test/.../mailbox/` that:
* Stands up two real `MailboxService` instances on localhost (sender side +
receiver side, full gRPC stack — not mocked).
* Spawns a **slow reader** thread polling the receiving mailbox every 20 ms
(~50 blocks/s).
* Runs a **fast sender** loop on the test thread: `sender.send(block)` in a
tight loop with the same small `RowHeapDataBlock` reused over and over. No
large blocks — just a lot of them.
* After 3 seconds, asserts the sender pushed at least 10x more blocks than
the receiver polled (in practice the ratio is ~1700x on a developer laptop).
* Prints both client and server pool memory through the new gauges so the
failure mode is visible in CI output.
Representative output from a local run:
```
[GrpcSenderBackpressureReproTest] sent=197022 polled=106 ratio=1858.7x
sender MAILBOX_CLIENT_USED_*: direct=54525952B heap=0B (peak
growth=54525952B)
receiver MAILBOX_SERVER_USED_*: direct=8388608B heap=0B (peak
growth=8388608B)
```
The test's purpose for now is to make the bug **demonstrable** and
**measurable**, not to gate merges on memory thresholds (those are flaky across
hosts).
## What's missing (and intentionally so)
The actual sender-side backpressure mechanism. Once we agree on the approach
— most likely a `ClientCallStreamObserver` cast in
`GrpcSendingMailbox.getContentObserver()` plus an `isReady()`-gated wait in
`sendContent()`, optionally combined with the receiver buffer-size feedback
that's already being sent — this PR will be updated to include:
* The flow-control change in `GrpcSendingMailbox`.
* An inversion of the assertion in `GrpcSenderBackpressureReproTest`: after
the fix the sender's send count should stay close to the receiver's polled
count plus a small in-flight constant. The current "outpaces by ≥10x" assertion
is left as a TODO comment in the test for that reason.
## Design questions to discuss on this PR
1. **Where to apply the wait.** `setOnReadyHandler` + `Condition` on the
sending thread vs. busy-checking `isReady()` per chunk. The sender currently
runs on the query-runner thread; a blocking wait there is acceptable because
the OpChain is already structured around blocking sends.
2. **Whether to honor the receiver's `buffer_size` feedback** in addition to
gRPC `isReady()`. The receiver knows about the bounded `ReceivingMailbox` queue
(default 5 blocks); gRPC's `isReady()` only knows about HTTP/2 + Netty queues.
Either signal can be the bottleneck — we may want both.
3. **Cap on outbound queued bytes per channel** (`NettyChannelBuilder`
write-buffer watermark) as a defense-in-depth limit so a single misbehaving
stream can't blow the global pool.
## Backwards compatibility
* New gauges: additive only — no rename or removal of existing gauges.
* New `MailboxService` accessors: additive.
* `ChannelManager.getBufAllocatorMetric()` is new and `public` (consumed by
`MailboxService`); `GrpcMailboxServer.getBufAllocatorMetric()` is new and
`public`.
* No on-wire protocol change. No config-key change. Rolling upgrades
unaffected.
## Test plan
- [x] `GrpcSenderBackpressureReproTest` passes locally.
- [x] Existing `MailboxServiceTest`, `GrpcSendingMailboxTest` still pass.
- [ ] CI green on apache/pinot.
- [ ] Verify `MAILBOX_CLIENT_USED_*` gauges show up under JMX / Prometheus
on a real broker and server (manual check before un-drafting).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]