gortiz commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3274550657


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java:
##########
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Validates that sender-side gRPC back-pressure keeps a fast sender roughly 
in step with a slow receiver.
+///
+/// A "fast sender" pushes the same small data block repeatedly on the test 
thread while a "slow reader"
+/// thread polls the receiving mailbox at roughly 50 blocks per second. With 
back-pressure in place, the
+/// sender thread blocks inside [GrpcSendingMailbox.awaitReady] whenever the 
gRPC outbound queue fills,
+/// so the send rate tracks the polling rate plus a bounded in-flight pipeline.
+///
+/// With the new transport defaults (64 MB HTTP/2 flow-control window and 64 
MB Netty write-buffer
+/// high-water mark), both the gRPC stream window and the Netty WriteQueue can 
buffer far more data
+/// before signalling back-pressure.  As a result, 
[GrpcSendingMailbox.awaitReady] rarely fires the
+/// application-level gate during a 3-second run; the back-pressure that 
*does* apply is transport-level
+/// (the kernel's TCP send buffer and the receiver's gRPC server read loop).  
This means the ratio of
+/// `sendCount` to `polledCount` is much larger than with the old 
narrow-window defaults, and the test

Review Comment:
   Added in commit `2d8fe69320` as `GrpcSenderBackpressureTightGateTest`, with 
exactly the config matrix you proposed:
   
   ```java
   Map.of(
       KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, 65535,
       KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES, 262144,
       KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES, 131072,
       // Y3 fail-fast validation requires window >= maxInboundMessageSize, so 
shrink that too:
       KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 65535)
   ```
   
   Tight assertion: `sendCount <= polledCount * 50 + 10_000` (the original 
bound from before the wide-defaults relaxation). With narrow transport caps a 
deletion of the application gate would fail this loudly. Class-level Javadoc 
references the three-test matrix (`GrpcSenderBackpressureTest` — wide-defaults 
regime, `GrpcSenderBackpressureDisabledTest` — kill-switch off, this one — 
narrow transport with the gate as the dominant back-pressure mechanism).
   
   The `KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES` override was added in 
commit `1d29438dc0` ("Fail fast on invalid gRPC mailbox transport 
configuration") — the Y3 startup-time check I added (`flowControlWindow >= 
maxInboundMessageSize`) immediately caught this test setup as misconfigured on 
first run; shrinking both together to 65 535 satisfies it.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -47,21 +48,26 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxContentObserver.class);
 
   private final MailboxService _mailboxService;
-  private final StreamObserver<MailboxStatus> _responseObserver;
+  private final ServerCallStreamObserver<MailboxStatus> _responseObserver;
   private final List<ByteBuffer> _mailboxBuffers = 
Collections.synchronizedList(new ArrayList<>());
   private boolean _closedStream = false;
 
   private volatile ReceivingMailbox _mailbox;
 
   public MailboxContentObserver(MailboxService mailboxService, String 
mailboxId,
-      StreamObserver<MailboxStatus> responseObserver) {
+      ServerCallStreamObserver<MailboxStatus> responseObserver) {
     _mailboxService = mailboxService;
     _responseObserver = responseObserver;
     _mailbox = StringUtils.isNotBlank(mailboxId) ? 
_mailboxService.getReceivingMailbox(mailboxId) : null;
   }
 
   @Override
   public void onNext(MailboxContent mailboxContent) {
+    // Replenish one inbound-message credit immediately, before any work that 
might block (e.g., the
+    // offerData lock acquisition inside _mailbox.offerRaw). This decouples 
the sender's HTTP/2 window
+    // replenishment from the receiver's per-message processing time — gRPC 
will issue the WINDOW_UPDATE
+    // for this message as soon as this request(1) call sets the credit, not 
waiting for onNext to return.
+    _responseObserver.request(1);

Review Comment:
   Done in commit `b5d19714ae`. Above the `_responseObserver.request(1)` call 
there's now a blunt warning:
   
   ```java
   // Do not move this below the blocking _mailbox.offerRaw call — it must 
replenish credit
   // BEFORE the offer so the receiver doesn't gate the sender on per-message 
application drain time.
   _responseObserver.request(1);
   ```
   
   And in `CommonConstants.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES` Javadoc I 
spelled out that this value is the **per-stalled-stream receiver-side 
direct-memory exposure** (not just a throughput knob): when an inbound stream's 
application queue stalls, the wire can still buffer up to `flowControlWindow` 
bytes on that stream before the HTTP/2 peer stops sending. Operators sizing the 
window have the OOM-bound implication called out explicitly.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -70,19 +74,41 @@ public class GrpcSendingMailbox implements SendingMailbox {
   private final StatMap<MailboxSendOperator.StatKey> _statMap;
   private final MailboxStatusObserver _statusObserver = new 
MailboxStatusObserver();
   private final int _maxByteStringSize;
+  /// Kill-switch for the sender-side `isReady()` gate. When `false`, 
`awaitReady` short-circuits like the bypass
+  /// path and the sender pushes unconditionally — restoring the pre-1.6 
behaviour. Plumbed from
+  /// `pinot.query.runner.grpc.sender.backpressure.enabled` so it can be 
flipped without code changes if the gate
+  /// causes a regression in production, and also used by 
`BenchmarkGrpcMailboxSend` for A/B measurements.
+  private final boolean _backpressureEnabled;
   /// Indicates whether the sending side has attempted to close the mailbox 
(either via complete() or cancel()).
   private volatile boolean _senderSideClosed;
 
-  private StreamObserver<MailboxContent> _contentObserver;
+  /// Guards [#_readyCond]. [#_contentObserver] is normally written once by 
the sending thread on its first call to
+  /// [#sendInternal]. The field is declared `volatile` because [#cancel] and 
[#close] can read it from a different
+  /// thread (e.g. an external cancel from an OpChain on-failure callback or a 
watchdog in tests), and we need a
+  /// happens-before edge for the sender's lazy initialization.
+  private final ReentrantLock _readyLock = new ReentrantLock();
+  /// Signalled whenever any of the predicates `awaitReady()` waits on may 
have changed: the gRPC outbound becomes
+  /// ready, the receiver acknowledges a chunk, the receiver-side stream 
closes (success or error), or the sender
+  /// itself is cancelled. Multiple producers fire the signal; the waiter 
always re-checks the predicates after
+  /// waking up.
+  private final Condition _readyCond = _readyLock.newCondition();
+
+  private volatile ClientCallStreamObserver<MailboxContent> _contentObserver;
 
   public GrpcSendingMailbox(String id, ChannelManager channelManager, String 
hostname, int port, long deadlineMs,
       StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize) 
{
+    this(id, channelManager, hostname, port, deadlineMs, statMap, 
maxInboundMessageSize, true);
+  }
+
+  public GrpcSendingMailbox(String id, ChannelManager channelManager, String 
hostname, int port, long deadlineMs,

Review Comment:
   Done in commit `44e27187a8`. The delegating 7-arg constructor is gone; the 
only constructor is the 8-arg one with `boolean backpressureEnabled`. One test 
caller (`GrpcSendingMailboxTest`) updated to pass `true`; the production caller 
in `MailboxService.getSendingMailbox(...)` was already wired to pass 
`_grpcSenderBackpressureEnabled`. All 75 mailbox-related tests still pass.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2167,6 +2167,73 @@ public static class MultiStageQueryRunner {
     public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES 
= "pinot.query.runner.max.msg.size.bytes";
     public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = 
16 * 1024 * 1024;
 
+    /**

Review Comment:
   Done in commit `f488e2f2dc`. All five new keys in this block — 
`KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED`, 
`KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES`, 
`KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES`, 
`KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES`, 
`KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT` — converted from `/** */` with `<p>` tags 
to `///` markdown. Cross-module references use the FQN form 
(`[io.grpc.stub.ClientCallStreamObserver#isReady]`, 
`[org.apache.pinot.query.mailbox.channel.MailboxContentObserver#onNext]`) to 
match the existing convention used elsewhere in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to