yashmayya commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3269229763
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -160,13 +210,18 @@ public InstanceType getInstanceType() {
* not open the underlying channel or acquire any additional resources.
Instead, it will initialize lazily when the
* data is sent for the first time.
*/
+ // TODO: Consider adding an application-level global byte budget for sender
outbound. The
+ // transport-layer WriteBufferWaterMark already caps sender direct memory
per (host, port)
+ // channel, so the OOM the original PR fixed is bounded by watermark.high ×
#peers. A global
+ // byte budget would tighten the bound to a single configurable cap across
all peers, but is
+ // unnecessary unless fan-outs hit hundreds of peers per query.
Review Comment:
The TODO downplays the risk this PR was built to address. Pinot deployments
with 50–100 servers are common, and the bound here is `writeBufferHighWaterMark
× #peers` on the sender plus `flowControlWindow × #incoming_streams` on the
receiver. With the 64 MB defaults that's ~3.2 GiB/sender at 50 peers, ~6.4
GiB/sender at 100 peers, and a symmetric amount per concurrent inbound stream
on the receiver — multiple concurrent queries multiply this. That's well within
reach of the original failure mode on a moderate fan-out, not "hundreds of
peers per query".
Two concrete asks:
1. Convert this to a tracked issue with a quantitative trigger condition, so
it doesn't rot as a comment.
2. Add an operator-facing scaling formula to
`CommonConstants.MultiStageQueryRunner` Javadoc: `peak_sender_direct_memory ≈
writeBufferHighWaterMark × #peers`, `peak_receiver_direct_memory ≈
flowControlWindow × #incoming_streams`. Without that, operators have no way to
size watermark vs `-XX:MaxDirectMemorySize` for their topology, and the
defaults will deceive.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -47,21 +48,26 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
private static final Logger LOGGER =
LoggerFactory.getLogger(MailboxContentObserver.class);
private final MailboxService _mailboxService;
- private final StreamObserver<MailboxStatus> _responseObserver;
+ private final ServerCallStreamObserver<MailboxStatus> _responseObserver;
private final List<ByteBuffer> _mailboxBuffers =
Collections.synchronizedList(new ArrayList<>());
private boolean _closedStream = false;
private volatile ReceivingMailbox _mailbox;
public MailboxContentObserver(MailboxService mailboxService, String
mailboxId,
- StreamObserver<MailboxStatus> responseObserver) {
+ ServerCallStreamObserver<MailboxStatus> responseObserver) {
_mailboxService = mailboxService;
_responseObserver = responseObserver;
_mailbox = StringUtils.isNotBlank(mailboxId) ?
_mailboxService.getReceivingMailbox(mailboxId) : null;
}
@Override
public void onNext(MailboxContent mailboxContent) {
+ // Replenish one inbound-message credit immediately, before any work that
might block (e.g., the
+ // offerData lock acquisition inside _mailbox.offerRaw). This decouples
the sender's HTTP/2 window
+ // replenishment from the receiver's per-message processing time — gRPC
will issue the WINDOW_UPDATE
+ // for this message as soon as this request(1) call sets the credit, not
waiting for onNext to return.
+ _responseObserver.request(1);
Review Comment:
Subtle but worth surfacing: this credit replenishment runs *before* the
blocking `_mailbox.offerRaw` at line 85. The comment explains the intent
(decouple HTTP/2 window from per-message processing time) and the throughput
rationale is reasonable, but architecturally this means the receiver
replenishes credit independently of application drain rate. If the downstream
`ReceivingMailbox` queue (cap 5) fills and the dispatch thread parks in
`_notFull.await`, the sender keeps shipping bytes into the receiver's Netty
inbound buffer up to the 64 MB stream window — i.e. receiver-side direct memory
pinned per stalled stream is bounded by `flowControlWindow`, not by application
progress.
That is the intentional design and it works as long as `flowControlWindow ×
concurrent_streams` stays well below `-XX:MaxDirectMemorySize`. Two suggestions:
1. Add `// Do not move this below the blocking call` so a future maintainer
doesn't "fix" the apparent ordering oddity and re-couple credits to drain rate.
2. Spell out in the `KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES` Javadoc that
this value is the *per-stalled-stream* receiver-side direct-memory exposure,
not just a throughput knob — so operators sizing it understand the OOM-bound
implication.
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java:
##########
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Validates that sender-side gRPC back-pressure keeps a fast sender roughly
in step with a slow receiver.
+///
+/// A "fast sender" pushes the same small data block repeatedly on the test
thread while a "slow reader"
+/// thread polls the receiving mailbox at roughly 50 blocks per second. With
back-pressure in place, the
+/// sender thread blocks inside [GrpcSendingMailbox.awaitReady] whenever the
gRPC outbound queue fills,
+/// so the send rate tracks the polling rate plus a bounded in-flight pipeline.
+///
+/// With the new transport defaults (64 MB HTTP/2 flow-control window and 64
MB Netty write-buffer
+/// high-water mark), both the gRPC stream window and the Netty WriteQueue can
buffer far more data
+/// before signalling back-pressure. As a result,
[GrpcSendingMailbox.awaitReady] rarely fires the
+/// application-level gate during a 3-second run; the back-pressure that
*does* apply is transport-level
+/// (the kernel's TCP send buffer and the receiver's gRPC server read loop).
This means the ratio of
+/// `sendCount` to `polledCount` is much larger than with the old
narrow-window defaults, and the test
Review Comment:
The honest admission here ("`awaitReady` rarely fires the application-level
gate") is correct — and it's also why this test no longer guards what it was
built to guard. With the new transport defaults a 128-byte payload at 50
polls/sec for 3 seconds simply cannot fill a 64 MB sender write-buffer or a 64
MB HTTP/2 stream window, so the application gate never engages. The 140×
relaxation (`polledCount * 50 + 10_000` → `polledCount * 7000` at line 210)
means a future refactor that deletes the entire `setOnReadyHandler` +
`ClientResponseObserver.beforeStart` + `awaitReady` machinery would still pass
this test, because the kernel TCP send buffer alone bounds the ratio well under
7000× at this payload size.
Suggest a companion `GrpcSenderBackpressureTightGateTest` that overrides the
new transport keys to small values via `PinotConfiguration`:
```
Map.of(
"pinot.query.runner.grpc.flow.control.window.bytes", "65535",
"pinot.query.runner.grpc.write.buffer.high.water.mark.bytes", "262144",
"pinot.query.runner.grpc.write.buffer.low.water.mark.bytes", "131072"
)
```
with the original tight assertion (`polledCount * 50 + 10_000`). That makes
the application gate the dominant back-pressure mechanism, so the test fails if
`awaitReady` regresses. `GrpcSenderBackpressureDisabledTest` already exercises
the kill-switch-off case; this would complete the matrix.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -208,17 +266,84 @@ public boolean isTerminated() {
return _senderSideClosed || _statusObserver.isFinished();
}
- private StreamObserver<MailboxContent> getContentObserver() {
+ private ClientCallStreamObserver<MailboxContent> getContentObserver() {
Metadata metadata = new Metadata();
metadata.put(ChannelUtils.MAILBOX_ID_METADATA_KEY, _id);
- return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname,
_port))
+ // We wrap `_statusObserver` in a ClientResponseObserver so we can
register the on-ready handler through
+ // `beforeStart` — gRPC rejects setOnReadyHandler() if it is called after
open() returns. Wrapping (rather than
+ // making MailboxStatusObserver itself a ClientResponseObserver) keeps the
back-pressure plumbing local to this
+ // class. The wrapper delegates the data callbacks unchanged, and signals
our `_readyCond` on stream close so a
+ // blocked sender wakes up to observe `_statusObserver.isFinished()`
becoming true.
+ ClientResponseObserver<MailboxContent, MailboxStatus> responseObserver =
+ new ClientResponseObserver<MailboxContent, MailboxStatus>() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver<MailboxContent>
requestStream) {
+ // Fires on a gRPC channel/Netty thread whenever isReady()
transitions false -> true. Just signal; the
+ // sender re-checks the predicate after waking.
+
requestStream.setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters);
+ }
+
+ @Override
+ public void onNext(MailboxStatus value) {
+ _statusObserver.onNext(value);
+ // Only wake on receiver early-terminate. Transport-level
isReady() transitions reach a parked
+ // sender through setOnReadyHandler (registered in beforeStart
above); normal buffer-size ACKs
+ // do not change any predicate awaitReady() actually waits on, so
signalling them would force a
+ // spurious park/unpark cycle on every receiver ACK.
Early-terminate is the one status-only
+ // change (the stream stays open) that awaitReady() must observe
promptly, so we still signal
+ // here when its metadata is set.
+ if (Boolean.parseBoolean(
+
value.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)))
{
+ wakeWaiters();
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ try {
+ _statusObserver.onError(t);
+ } finally {
+ wakeWaiters();
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ try {
+ _statusObserver.onCompleted();
+ } finally {
+ wakeWaiters();
+ }
+ }
+ };
+
+ return (ClientCallStreamObserver<MailboxContent>) PinotMailboxGrpc.newStub(
+ _channelManager.getChannel(_hostname, _port))
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
.withDeadlineAfter(_deadlineMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS)
- .open(_statusObserver);
+ .open(responseObserver);
}
protected void sendContent(ByteString byteString, boolean waitForMore) {
+ sendContent(byteString, waitForMore, false);
+ }
+
+ protected void sendContent(ByteString byteString, boolean waitForMore,
boolean bypassReady) {
+ if (!awaitReady(bypassReady)) {
+ // Either the mailbox was cancelled while we were waiting (normal path)
or the gRPC stream is already dead
+ // (bypass path). Either way, skip the send.
+ return;
+ }
+ // Narrow-window race mitigation: a concurrent cancel() may have run
between awaitReady() returning true and
+ // here, setting _senderSideClosed and pushing its own error EOS. If we
proceed, both threads would call
+ // onNext() on the same non-thread-safe ClientCallStreamObserver.
Re-checking after the gate reduces (but
+ // does not fully eliminate) that window; fully eliminating it would
require serializing all onNext() calls
+ // under _readyLock, which is more invasive. The bypass path
(cancel/close) must push through regardless,
+ // so this guard only applies when bypassReady == false.
+ if (!bypassReady && isTerminated()) {
Review Comment:
Round-1 review flagged this race; the EOS path was correctly closed in
commit `573356ee74` by reordering `_senderSideClosed = true` before
`onCompleted()`. This data-path mitigation is narrower — the `isTerminated()`
re-check reduces but doesn't eliminate the window where two threads can call
`onNext` on the same non-thread-safe `ClientCallStreamObserver`. Under heavy
cancel-while-sending workloads the race surfaces as either an
`IllegalStateException` from gRPC ("call already closed") or two concurrent
`onNext` calls on a stream gRPC does not contractually promise to be
thread-safe.
The clean fix is to hold `_readyLock` across `awaitReady()` + `onNext()`.
The fast path of `awaitReady` is already lock-free (returns on `isReady()`
outside the lock), so the extra acquire is paid only when the gate would have
parked anyway — negligible. Alternative: keep the current approach but elevate
this paragraph to class-level Javadoc so it's not lost in the middle of
`sendContent`.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java:
##########
@@ -62,18 +63,31 @@ public class ChannelManager {
private final PooledByteBufAllocator _bufAllocator;
@Nullable
private final SslContext _clientSslContext;
+ private final int _writeBufferHighWaterMarkBytes;
+ private final int _writeBufferLowWaterMarkBytes;
/**
* Constructs a {@code ChannelManager}.
*
* @param clientSslContext optional cached client {@link SslContext} to
reuse across channels
* @param maxInboundMessageSize maximum inbound message size for gRPC
channels
* @param idleTimeout idle timeout for gRPC channels; channels close after
this period of inactivity
+ * @param writeBufferHighWaterMarkBytes Netty per-channel {@link
WriteBufferWaterMark} high watermark. This limit is
+ * per {@code (host, port)} peer and is
shared across all streams multiplexed on
+ * that channel. The low watermark must
be ≤ the high watermark; misordered values
+ * will be rejected by Netty at channel
construction.
+ * @param writeBufferLowWaterMarkBytes Netty per-channel {@link
WriteBufferWaterMark} low mark. Once the channel's
+ * pending write queue grows above the
high watermark, the channel is marked
+ * unwritable; it becomes writable again
only when the queue drains below this
+ * low watermark. Must be ≤ {@code
writeBufferHighWaterMarkBytes}.
*/
- public ChannelManager(@Nullable SslContext clientSslContext, int
maxInboundMessageSize, Duration idleTimeout) {
+ public ChannelManager(@Nullable SslContext clientSslContext, int
maxInboundMessageSize, Duration idleTimeout,
+ int writeBufferHighWaterMarkBytes, int writeBufferLowWaterMarkBytes) {
_clientSslContext = clientSslContext;
_maxInboundMessageSize = maxInboundMessageSize;
_idleTimeout = idleTimeout;
+ _writeBufferHighWaterMarkBytes = writeBufferHighWaterMarkBytes;
+ _writeBufferLowWaterMarkBytes = writeBufferLowWaterMarkBytes;
Review Comment:
Two validation gaps that fail lazily instead of at startup:
1. `WriteBufferWaterMark(low, high)` throws `IllegalArgumentException` if
`low > high`, but only inside `getChannel` on the first send to a
previously-unseen peer (lines 103-104, 116-117). A misconfigured cluster starts
up fine, exposes metrics, and then explodes mid-query.
2. No positivity check; a 0 or negative watermark would be accepted by Netty
as effectively always-writable / always-unwritable — not what operators expect.
Worth adding `Preconditions.checkArgument(0 < writeBufferLowWaterMarkBytes
&& writeBufferLowWaterMarkBytes <= writeBufferHighWaterMarkBytes, ...)` here,
matching the existing pattern at `GrpcMailboxServer.java:149-152` for
`_inboundMessageCredit`. Same shape applies to `flowControlWindow >=
maxInboundMessageSize` in `GrpcMailboxServer` — a window smaller than the max
message size makes the stream unusable, and there's currently no check.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java:
##########
@@ -166,9 +187,26 @@ public void shutdown() {
}
}
+ /// Bytes of direct (off-heap) memory currently pinned by the gRPC server
+ /// allocator backing this mailbox server. This is the same allocator whose
+ /// values are exported as `MAILBOX_SERVER_USED_DIRECT_MEMORY` gauges.
+ public long usedDirectMemoryBytes() {
+ return _bufAllocatorMetric.usedDirectMemory();
+ }
+
+ /// Bytes of heap memory currently pinned by the gRPC server allocator
backing
+ /// this mailbox server. Exported as `MAILBOX_SERVER_USED_HEAP_MEMORY`
gauges.
+ public long usedHeapMemoryBytes() {
+ return _bufAllocatorMetric.usedHeapMemory();
+ }
+
@Override
public StreamObserver<Mailbox.MailboxContent>
open(StreamObserver<Mailbox.MailboxStatus> responseObserver) {
String mailboxId = ChannelUtils.MAILBOX_ID_CTX_KEY.get();
- return new MailboxContentObserver(_mailboxService, mailboxId,
responseObserver);
+ ServerCallStreamObserver<Mailbox.MailboxStatus> serverCallObserver =
+ (ServerCallStreamObserver<Mailbox.MailboxStatus>) responseObserver;
+ serverCallObserver.disableAutoInboundFlowControl();
+ serverCallObserver.request(_inboundMessageCredit);
Review Comment:
Behavioral regression worth documenting (and ideally testing): with auto
inbound flow control the receiver saw one in-flight message at a time. With
this change the in-flight window per stream becomes `min(_inboundMessageCredit
messages, flowControlWindow bytes)` — `min(128, 64 MB)` at defaults. That
widens cancel-propagation latency proportionally: when the sender calls
`cancel()` and pushes an error EOS (`bypassReady=true`), the EOS is appended to
the same HTTP/2 stream behind any messages already in the inbound buffer. The
receiver dispatches each through `MailboxContentObserver.onNext` before
observing the cancel, and if the application queue is full each dispatch parks
in `_notFull.await`. A cancelled query can take noticeably longer to actually
stop while up to 64 MB of buffered data is drained.
Two recommendations:
1. Register `setOnCancelHandler` on the `ServerCallStreamObserver` here.
That gives the server an out-of-band signal to call `_mailbox.cancel()` and
wake the parked dispatch thread immediately, independent of the stream-ordered
EOS.
2. Add a test that issues a cancel while the receiver is parked on a full
queue and asserts the receiver mailbox terminates within a tight bound (e.g. <
100 ms), independent of how many buffered messages remain on the stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]