yashmayya commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3269229763


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -160,13 +210,18 @@ public InstanceType getInstanceType() {
    * not open the underlying channel or acquire any additional resources. 
Instead, it will initialize lazily when the
    * data is sent for the first time.
    */
+  // TODO: Consider adding an application-level global byte budget for sender 
outbound. The
+  //  transport-layer WriteBufferWaterMark already caps sender direct memory 
per (host, port)
+  //  channel, so the OOM the original PR fixed is bounded by watermark.high × 
#peers. A global
+  //  byte budget would tighten the bound to a single configurable cap across 
all peers, but is
+  //  unnecessary unless fan-outs hit hundreds of peers per query.

Review Comment:
   The TODO downplays the risk this PR was built to address. Pinot deployments 
with 50–100 servers are common, and the bound here is `writeBufferHighWaterMark 
× #peers` on the sender plus `flowControlWindow × #incoming_streams` on the 
receiver. With the 64 MB defaults that's ~3.2 GiB/sender at 50 peers, ~6.4 
GiB/sender at 100 peers, and a symmetric amount per concurrent inbound stream 
on the receiver — multiple concurrent queries multiply this. That's well within 
reach of the original failure mode on a moderate fan-out, not "hundreds of 
peers per query".
   
   Two concrete asks:
   1. Convert this to a tracked issue with a quantitative trigger condition, so 
it doesn't rot as a comment.
   2. Add an operator-facing scaling formula to 
`CommonConstants.MultiStageQueryRunner` Javadoc: `peak_sender_direct_memory ≈ 
writeBufferHighWaterMark × #peers`, `peak_receiver_direct_memory ≈ 
flowControlWindow × #incoming_streams`. Without that, operators have no way to 
size watermark vs `-XX:MaxDirectMemorySize` for their topology, and the 
defaults will deceive.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -47,21 +48,26 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxContentObserver.class);
 
   private final MailboxService _mailboxService;
-  private final StreamObserver<MailboxStatus> _responseObserver;
+  private final ServerCallStreamObserver<MailboxStatus> _responseObserver;
   private final List<ByteBuffer> _mailboxBuffers = 
Collections.synchronizedList(new ArrayList<>());
   private boolean _closedStream = false;
 
   private volatile ReceivingMailbox _mailbox;
 
   public MailboxContentObserver(MailboxService mailboxService, String 
mailboxId,
-      StreamObserver<MailboxStatus> responseObserver) {
+      ServerCallStreamObserver<MailboxStatus> responseObserver) {
     _mailboxService = mailboxService;
     _responseObserver = responseObserver;
     _mailbox = StringUtils.isNotBlank(mailboxId) ? 
_mailboxService.getReceivingMailbox(mailboxId) : null;
   }
 
   @Override
   public void onNext(MailboxContent mailboxContent) {
+    // Replenish one inbound-message credit immediately, before any work that 
might block (e.g., the
+    // offerData lock acquisition inside _mailbox.offerRaw). This decouples 
the sender's HTTP/2 window
+    // replenishment from the receiver's per-message processing time — gRPC 
will issue the WINDOW_UPDATE
+    // for this message as soon as this request(1) call sets the credit, not 
waiting for onNext to return.
+    _responseObserver.request(1);

Review Comment:
   Subtle but worth surfacing: this credit replenishment runs *before* the 
blocking `_mailbox.offerRaw` at line 85. The comment explains the intent 
(decouple HTTP/2 window from per-message processing time) and the throughput 
rationale is reasonable, but architecturally this means the receiver 
replenishes credit independently of application drain rate. If the downstream 
`ReceivingMailbox` queue (cap 5) fills and the dispatch thread parks in 
`_notFull.await`, the sender keeps shipping bytes into the receiver's Netty 
inbound buffer up to the 64 MB stream window — i.e. receiver-side direct memory 
pinned per stalled stream is bounded by `flowControlWindow`, not by application 
progress.
   
   That is the intentional design and it works as long as `flowControlWindow × 
concurrent_streams` stays well below `-XX:MaxDirectMemorySize`. Two suggestions:
   
   1. Add `// Do not move this below the blocking call` so a future maintainer 
doesn't "fix" the apparent ordering oddity and re-couple credits to drain rate.
   2. Spell out in the `KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES` Javadoc that 
this value is the *per-stalled-stream* receiver-side direct-memory exposure, 
not just a throughput knob — so operators sizing it understand the OOM-bound 
implication.



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java:
##########
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Validates that sender-side gRPC back-pressure keeps a fast sender roughly 
in step with a slow receiver.
+///
+/// A "fast sender" pushes the same small data block repeatedly on the test 
thread while a "slow reader"
+/// thread polls the receiving mailbox at roughly 50 blocks per second. With 
back-pressure in place, the
+/// sender thread blocks inside [GrpcSendingMailbox.awaitReady] whenever the 
gRPC outbound queue fills,
+/// so the send rate tracks the polling rate plus a bounded in-flight pipeline.
+///
+/// With the new transport defaults (64 MB HTTP/2 flow-control window and 64 
MB Netty write-buffer
+/// high-water mark), both the gRPC stream window and the Netty WriteQueue can 
buffer far more data
+/// before signalling back-pressure.  As a result, 
[GrpcSendingMailbox.awaitReady] rarely fires the
+/// application-level gate during a 3-second run; the back-pressure that 
*does* apply is transport-level
+/// (the kernel's TCP send buffer and the receiver's gRPC server read loop).  
This means the ratio of
+/// `sendCount` to `polledCount` is much larger than with the old 
narrow-window defaults, and the test

Review Comment:
   The honest admission here ("`awaitReady` rarely fires the application-level 
gate") is correct — and it's also why this test no longer guards what it was 
built to guard. With the new transport defaults a 128-byte payload at 50 
polls/sec for 3 seconds simply cannot fill a 64 MB sender write-buffer or a 64 
MB HTTP/2 stream window, so the application gate never engages. The 140× 
relaxation (`polledCount * 50 + 10_000` → `polledCount * 7000` at line 210) 
means a future refactor that deletes the entire `setOnReadyHandler` + 
`ClientResponseObserver.beforeStart` + `awaitReady` machinery would still pass 
this test, because the kernel TCP send buffer alone bounds the ratio well under 
7000× at this payload size.
   
   Suggest a companion `GrpcSenderBackpressureTightGateTest` that overrides the 
new transport keys to small values via `PinotConfiguration`:
   
   ```
   Map.of(
     "pinot.query.runner.grpc.flow.control.window.bytes", "65535",
     "pinot.query.runner.grpc.write.buffer.high.water.mark.bytes", "262144",
     "pinot.query.runner.grpc.write.buffer.low.water.mark.bytes", "131072"
   )
   ```
   
   with the original tight assertion (`polledCount * 50 + 10_000`). That makes 
the application gate the dominant back-pressure mechanism, so the test fails if 
`awaitReady` regresses. `GrpcSenderBackpressureDisabledTest` already exercises 
the kill-switch-off case; this would complete the matrix.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -208,17 +266,84 @@ public boolean isTerminated() {
     return _senderSideClosed || _statusObserver.isFinished();
   }
 
-  private StreamObserver<MailboxContent> getContentObserver() {
+  private ClientCallStreamObserver<MailboxContent> getContentObserver() {
     Metadata metadata = new Metadata();
     metadata.put(ChannelUtils.MAILBOX_ID_METADATA_KEY, _id);
 
-    return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname, 
_port))
+    // We wrap `_statusObserver` in a ClientResponseObserver so we can 
register the on-ready handler through
+    // `beforeStart` — gRPC rejects setOnReadyHandler() if it is called after 
open() returns. Wrapping (rather than
+    // making MailboxStatusObserver itself a ClientResponseObserver) keeps the 
back-pressure plumbing local to this
+    // class. The wrapper delegates the data callbacks unchanged, and signals 
our `_readyCond` on stream close so a
+    // blocked sender wakes up to observe `_statusObserver.isFinished()` 
becoming true.
+    ClientResponseObserver<MailboxContent, MailboxStatus> responseObserver =
+        new ClientResponseObserver<MailboxContent, MailboxStatus>() {
+          @Override
+          public void beforeStart(ClientCallStreamObserver<MailboxContent> 
requestStream) {
+            // Fires on a gRPC channel/Netty thread whenever isReady() 
transitions false -> true. Just signal; the
+            // sender re-checks the predicate after waking.
+            
requestStream.setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters);
+          }
+
+          @Override
+          public void onNext(MailboxStatus value) {
+            _statusObserver.onNext(value);
+            // Only wake on receiver early-terminate. Transport-level 
isReady() transitions reach a parked
+            // sender through setOnReadyHandler (registered in beforeStart 
above); normal buffer-size ACKs
+            // do not change any predicate awaitReady() actually waits on, so 
signalling them would force a
+            // spurious park/unpark cycle on every receiver ACK. 
Early-terminate is the one status-only
+            // change (the stream stays open) that awaitReady() must observe 
promptly, so we still signal
+            // here when its metadata is set.
+            if (Boolean.parseBoolean(
+                
value.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)))
 {
+              wakeWaiters();
+            }
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            try {
+              _statusObserver.onError(t);
+            } finally {
+              wakeWaiters();
+            }
+          }
+
+          @Override
+          public void onCompleted() {
+            try {
+              _statusObserver.onCompleted();
+            } finally {
+              wakeWaiters();
+            }
+          }
+        };
+
+    return (ClientCallStreamObserver<MailboxContent>) PinotMailboxGrpc.newStub(
+            _channelManager.getChannel(_hostname, _port))
         .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
         .withDeadlineAfter(_deadlineMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS)
-        .open(_statusObserver);
+        .open(responseObserver);
   }
 
   protected void sendContent(ByteString byteString, boolean waitForMore) {
+    sendContent(byteString, waitForMore, false);
+  }
+
+  protected void sendContent(ByteString byteString, boolean waitForMore, 
boolean bypassReady) {
+    if (!awaitReady(bypassReady)) {
+      // Either the mailbox was cancelled while we were waiting (normal path) 
or the gRPC stream is already dead
+      // (bypass path). Either way, skip the send.
+      return;
+    }
+    // Narrow-window race mitigation: a concurrent cancel() may have run 
between awaitReady() returning true and
+    // here, setting _senderSideClosed and pushing its own error EOS. If we 
proceed, both threads would call
+    // onNext() on the same non-thread-safe ClientCallStreamObserver. 
Re-checking after the gate reduces (but
+    // does not fully eliminate) that window; fully eliminating it would 
require serializing all onNext() calls
+    // under _readyLock, which is more invasive. The bypass path 
(cancel/close) must push through regardless,
+    // so this guard only applies when bypassReady == false.
+    if (!bypassReady && isTerminated()) {

Review Comment:
   Round-1 review flagged this race; the EOS path was correctly closed in 
commit `573356ee74` by reordering `_senderSideClosed = true` before 
`onCompleted()`. This data-path mitigation is narrower — the `isTerminated()` 
re-check reduces but doesn't eliminate the window where two threads can call 
`onNext` on the same non-thread-safe `ClientCallStreamObserver`. Under heavy 
cancel-while-sending workloads the race surfaces as either an 
`IllegalStateException` from gRPC ("call already closed") or two concurrent 
`onNext` calls on a stream gRPC does not contractually promise to be 
thread-safe.
   
   The clean fix is to hold `_readyLock` across `awaitReady()` + `onNext()`. 
The fast path of `awaitReady` is already lock-free (returns on `isReady()` 
outside the lock), so the extra acquire is paid only when the gate would have 
parked anyway — negligible. Alternative: keep the current approach but elevate 
this paragraph to class-level Javadoc so it's not lost in the middle of 
`sendContent`.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java:
##########
@@ -62,18 +63,31 @@ public class ChannelManager {
   private final PooledByteBufAllocator _bufAllocator;
   @Nullable
   private final SslContext _clientSslContext;
+  private final int _writeBufferHighWaterMarkBytes;
+  private final int _writeBufferLowWaterMarkBytes;
 
   /**
    * Constructs a {@code ChannelManager}.
    *
    * @param clientSslContext optional cached client {@link SslContext} to 
reuse across channels
    * @param maxInboundMessageSize maximum inbound message size for gRPC 
channels
    * @param idleTimeout idle timeout for gRPC channels; channels close after 
this period of inactivity
+   * @param writeBufferHighWaterMarkBytes Netty per-channel {@link 
WriteBufferWaterMark} high watermark. This limit is
+   *                                     per {@code (host, port)} peer and is 
shared across all streams multiplexed on
+   *                                     that channel. The low watermark must 
be ≤ the high watermark; misordered values
+   *                                     will be rejected by Netty at channel 
construction.
+   * @param writeBufferLowWaterMarkBytes Netty per-channel {@link 
WriteBufferWaterMark} low mark. Once the channel's
+   *                                     pending write queue grows above the 
high watermark, the channel is marked
+   *                                     unwritable; it becomes writable again 
only when the queue drains below this
+   *                                     low watermark. Must be ≤ {@code 
writeBufferHighWaterMarkBytes}.
    */
-  public ChannelManager(@Nullable SslContext clientSslContext, int 
maxInboundMessageSize, Duration idleTimeout) {
+  public ChannelManager(@Nullable SslContext clientSslContext, int 
maxInboundMessageSize, Duration idleTimeout,
+      int writeBufferHighWaterMarkBytes, int writeBufferLowWaterMarkBytes) {
     _clientSslContext = clientSslContext;
     _maxInboundMessageSize = maxInboundMessageSize;
     _idleTimeout = idleTimeout;
+    _writeBufferHighWaterMarkBytes = writeBufferHighWaterMarkBytes;
+    _writeBufferLowWaterMarkBytes = writeBufferLowWaterMarkBytes;

Review Comment:
   Two validation gaps that fail lazily instead of at startup:
   
   1. `WriteBufferWaterMark(low, high)` throws `IllegalArgumentException` if 
`low > high`, but only inside `getChannel` on the first send to a 
previously-unseen peer (lines 103-104, 116-117). A misconfigured cluster starts 
up fine, exposes metrics, and then explodes mid-query.
   2. No positivity check; a 0 or negative watermark would be accepted by Netty 
as effectively always-writable / always-unwritable — not what operators expect.
   
   Worth adding `Preconditions.checkArgument(0 < writeBufferLowWaterMarkBytes 
&& writeBufferLowWaterMarkBytes <= writeBufferHighWaterMarkBytes, ...)` here, 
matching the existing pattern at `GrpcMailboxServer.java:149-152` for 
`_inboundMessageCredit`. Same shape applies to `flowControlWindow >= 
maxInboundMessageSize` in `GrpcMailboxServer` — a window smaller than the max 
message size makes the stream unusable, and there's currently no check.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java:
##########
@@ -166,9 +187,26 @@ public void shutdown() {
     }
   }
 
+  /// Bytes of direct (off-heap) memory currently pinned by the gRPC server
+  /// allocator backing this mailbox server. This is the same allocator whose
+  /// values are exported as `MAILBOX_SERVER_USED_DIRECT_MEMORY` gauges.
+  public long usedDirectMemoryBytes() {
+    return _bufAllocatorMetric.usedDirectMemory();
+  }
+
+  /// Bytes of heap memory currently pinned by the gRPC server allocator 
backing
+  /// this mailbox server. Exported as `MAILBOX_SERVER_USED_HEAP_MEMORY` 
gauges.
+  public long usedHeapMemoryBytes() {
+    return _bufAllocatorMetric.usedHeapMemory();
+  }
+
   @Override
   public StreamObserver<Mailbox.MailboxContent> 
open(StreamObserver<Mailbox.MailboxStatus> responseObserver) {
     String mailboxId = ChannelUtils.MAILBOX_ID_CTX_KEY.get();
-    return new MailboxContentObserver(_mailboxService, mailboxId, 
responseObserver);
+    ServerCallStreamObserver<Mailbox.MailboxStatus> serverCallObserver =
+        (ServerCallStreamObserver<Mailbox.MailboxStatus>) responseObserver;
+    serverCallObserver.disableAutoInboundFlowControl();
+    serverCallObserver.request(_inboundMessageCredit);

Review Comment:
   Behavioral regression worth documenting (and ideally testing): with auto 
inbound flow control the receiver saw one in-flight message at a time. With 
this change the in-flight window per stream becomes `min(_inboundMessageCredit 
messages, flowControlWindow bytes)` — `min(128, 64 MB)` at defaults. That 
widens cancel-propagation latency proportionally: when the sender calls 
`cancel()` and pushes an error EOS (`bypassReady=true`), the EOS is appended to 
the same HTTP/2 stream behind any messages already in the inbound buffer. The 
receiver dispatches each through `MailboxContentObserver.onNext` before 
observing the cancel, and if the application queue is full each dispatch parks 
in `_notFull.await`. A cancelled query can take noticeably longer to actually 
stop while up to 64 MB of buffered data is drained.
   
   Two recommendations:
   
   1. Register `setOnCancelHandler` on the `ServerCallStreamObserver` here. 
That gives the server an out-of-band signal to call `_mailbox.cancel()` and 
wake the parked dispatch thread immediately, independent of the stream-ordered 
EOS.
   2. Add a test that issues a cancel while the receiver is parked on a full 
queue and asserts the receiver mailbox terminates within a tight bound (e.g. < 
100 ms), independent of how many buffered messages remain on the stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to