gortiz commented on code in PR #18519: URL: https://github.com/apache/pinot/pull/18519#discussion_r3274550657
########## pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java: ########## @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.mailbox; + +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.query.planner.physical.MailboxIdUtils; +import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock; +import org.apache.pinot.query.runtime.operator.MailboxSendOperator; +import org.apache.pinot.query.runtime.operator.OperatorTestUtil; +import org.apache.pinot.query.testutils.QueryTestUtils; +import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertTrue; + + +/// Validates that sender-side gRPC back-pressure keeps a fast sender roughly in step with a slow receiver. +/// +/// A "fast sender" pushes the same small data block repeatedly on the test thread while a "slow reader" +/// thread polls the receiving mailbox at roughly 50 blocks per second. With back-pressure in place, the +/// sender thread blocks inside [GrpcSendingMailbox.awaitReady] whenever the gRPC outbound queue fills, +/// so the send rate tracks the polling rate plus a bounded in-flight pipeline. +/// +/// With the new transport defaults (64 MB HTTP/2 flow-control window and 64 MB Netty write-buffer +/// high-water mark), both the gRPC stream window and the Netty WriteQueue can buffer far more data +/// before signalling back-pressure. As a result, [GrpcSendingMailbox.awaitReady] rarely fires the +/// application-level gate during a 3-second run; the back-pressure that *does* apply is transport-level +/// (the kernel's TCP send buffer and the receiver's gRPC server read loop). This means the ratio of +/// `sendCount` to `polledCount` is much larger than with the old narrow-window defaults, and the test Review Comment: Added in commit `2d8fe69320` as `GrpcSenderBackpressureTightGateTest`, with exactly the config matrix you proposed: ```java Map.of( KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, 65535, KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES, 262144, KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES, 131072, // Y3 fail-fast validation requires window >= maxInboundMessageSize, so shrink that too: KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 65535) ``` Tight assertion: `sendCount <= polledCount * 50 + 10_000` (the original bound from before the wide-defaults relaxation). With narrow transport caps a deletion of the application gate would fail this loudly. Class-level Javadoc references the three-test matrix (`GrpcSenderBackpressureTest` — wide-defaults regime, `GrpcSenderBackpressureDisabledTest` — kill-switch off, this one — narrow transport with the gate as the dominant back-pressure mechanism). The `KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES` override was added in commit `1d29438dc0` ("Fail fast on invalid gRPC mailbox transport configuration") — the Y3 startup-time check I added (`flowControlWindow >= maxInboundMessageSize`) immediately caught this test setup as misconfigured on first run; shrinking both together to 65 535 satisfies it. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java: ########## @@ -47,21 +48,26 @@ public class MailboxContentObserver implements StreamObserver<MailboxContent> { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentObserver.class); private final MailboxService _mailboxService; - private final StreamObserver<MailboxStatus> _responseObserver; + private final ServerCallStreamObserver<MailboxStatus> _responseObserver; private final List<ByteBuffer> _mailboxBuffers = Collections.synchronizedList(new ArrayList<>()); private boolean _closedStream = false; private volatile ReceivingMailbox _mailbox; public MailboxContentObserver(MailboxService mailboxService, String mailboxId, - StreamObserver<MailboxStatus> responseObserver) { + ServerCallStreamObserver<MailboxStatus> responseObserver) { _mailboxService = mailboxService; _responseObserver = responseObserver; _mailbox = StringUtils.isNotBlank(mailboxId) ? _mailboxService.getReceivingMailbox(mailboxId) : null; } @Override public void onNext(MailboxContent mailboxContent) { + // Replenish one inbound-message credit immediately, before any work that might block (e.g., the + // offerData lock acquisition inside _mailbox.offerRaw). This decouples the sender's HTTP/2 window + // replenishment from the receiver's per-message processing time — gRPC will issue the WINDOW_UPDATE + // for this message as soon as this request(1) call sets the credit, not waiting for onNext to return. + _responseObserver.request(1); Review Comment: Done in commit `b5d19714ae`. Above the `_responseObserver.request(1)` call there's now a blunt warning: ```java // Do not move this below the blocking _mailbox.offerRaw call — it must replenish credit // BEFORE the offer so the receiver doesn't gate the sender on per-message application drain time. _responseObserver.request(1); ``` And in `CommonConstants.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES` Javadoc I spelled out that this value is the **per-stalled-stream receiver-side direct-memory exposure** (not just a throughput knob): when an inbound stream's application queue stalls, the wire can still buffer up to `flowControlWindow` bytes on that stream before the HTTP/2 peer stops sending. Operators sizing the window have the OOM-bound implication called out explicitly. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -70,19 +74,41 @@ public class GrpcSendingMailbox implements SendingMailbox { private final StatMap<MailboxSendOperator.StatKey> _statMap; private final MailboxStatusObserver _statusObserver = new MailboxStatusObserver(); private final int _maxByteStringSize; + /// Kill-switch for the sender-side `isReady()` gate. When `false`, `awaitReady` short-circuits like the bypass + /// path and the sender pushes unconditionally — restoring the pre-1.6 behaviour. Plumbed from + /// `pinot.query.runner.grpc.sender.backpressure.enabled` so it can be flipped without code changes if the gate + /// causes a regression in production, and also used by `BenchmarkGrpcMailboxSend` for A/B measurements. + private final boolean _backpressureEnabled; /// Indicates whether the sending side has attempted to close the mailbox (either via complete() or cancel()). private volatile boolean _senderSideClosed; - private StreamObserver<MailboxContent> _contentObserver; + /// Guards [#_readyCond]. [#_contentObserver] is normally written once by the sending thread on its first call to + /// [#sendInternal]. The field is declared `volatile` because [#cancel] and [#close] can read it from a different + /// thread (e.g. an external cancel from an OpChain on-failure callback or a watchdog in tests), and we need a + /// happens-before edge for the sender's lazy initialization. + private final ReentrantLock _readyLock = new ReentrantLock(); + /// Signalled whenever any of the predicates `awaitReady()` waits on may have changed: the gRPC outbound becomes + /// ready, the receiver acknowledges a chunk, the receiver-side stream closes (success or error), or the sender + /// itself is cancelled. Multiple producers fire the signal; the waiter always re-checks the predicates after + /// waking up. + private final Condition _readyCond = _readyLock.newCondition(); + + private volatile ClientCallStreamObserver<MailboxContent> _contentObserver; public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostname, int port, long deadlineMs, StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize) { + this(id, channelManager, hostname, port, deadlineMs, statMap, maxInboundMessageSize, true); + } + + public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostname, int port, long deadlineMs, Review Comment: Done in commit `44e27187a8`. The delegating 7-arg constructor is gone; the only constructor is the 8-arg one with `boolean backpressureEnabled`. One test caller (`GrpcSendingMailboxTest`) updated to pass `true`; the production caller in `MailboxService.getSendingMailbox(...)` was already wired to pass `_grpcSenderBackpressureEnabled`. All 75 mailbox-related tests still pass. ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -2167,6 +2167,73 @@ public static class MultiStageQueryRunner { public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = "pinot.query.runner.max.msg.size.bytes"; public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = 16 * 1024 * 1024; + /** Review Comment: Done in commit `f488e2f2dc`. All five new keys in this block — `KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED`, `KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES`, `KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES`, `KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES`, `KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT` — converted from `/** */` with `<p>` tags to `///` markdown. Cross-module references use the FQN form (`[io.grpc.stub.ClientCallStreamObserver#isReady]`, `[org.apache.pinot.query.mailbox.channel.MailboxContentObserver#onNext]`) to match the existing convention used elsewhere in this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
