Jackie-Jiang commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3269698334
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2167,6 +2167,73 @@ public static class MultiStageQueryRunner {
public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
= "pinot.query.runner.max.msg.size.bytes";
public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES =
16 * 1024 * 1024;
+ /**
Review Comment:
(minor) Switch to markdown
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -70,19 +74,41 @@ public class GrpcSendingMailbox implements SendingMailbox {
private final StatMap<MailboxSendOperator.StatKey> _statMap;
private final MailboxStatusObserver _statusObserver = new
MailboxStatusObserver();
private final int _maxByteStringSize;
+ /// Kill-switch for the sender-side `isReady()` gate. When `false`,
`awaitReady` short-circuits like the bypass
+ /// path and the sender pushes unconditionally — restoring the pre-1.6
behaviour. Plumbed from
+ /// `pinot.query.runner.grpc.sender.backpressure.enabled` so it can be
flipped without code changes if the gate
+ /// causes a regression in production, and also used by
`BenchmarkGrpcMailboxSend` for A/B measurements.
+ private final boolean _backpressureEnabled;
/// Indicates whether the sending side has attempted to close the mailbox
(either via complete() or cancel()).
private volatile boolean _senderSideClosed;
- private StreamObserver<MailboxContent> _contentObserver;
+ /// Guards [#_readyCond]. [#_contentObserver] is normally written once by
the sending thread on its first call to
+ /// [#sendInternal]. The field is declared `volatile` because [#cancel] and
[#close] can read it from a different
+ /// thread (e.g. an external cancel from an OpChain on-failure callback or a
watchdog in tests), and we need a
+ /// happens-before edge for the sender's lazy initialization.
+ private final ReentrantLock _readyLock = new ReentrantLock();
+ /// Signalled whenever any of the predicates `awaitReady()` waits on may
have changed: the gRPC outbound becomes
+ /// ready, the receiver acknowledges a chunk, the receiver-side stream
closes (success or error), or the sender
+ /// itself is cancelled. Multiple producers fire the signal; the waiter
always re-checks the predicates after
+ /// waking up.
+ private final Condition _readyCond = _readyLock.newCondition();
+
+ private volatile ClientCallStreamObserver<MailboxContent> _contentObserver;
public GrpcSendingMailbox(String id, ChannelManager channelManager, String
hostname, int port, long deadlineMs,
StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize)
{
+ this(id, channelManager, hostname, port, deadlineMs, statMap,
maxInboundMessageSize, true);
+ }
+
+ public GrpcSendingMailbox(String id, ChannelManager channelManager, String
hostname, int port, long deadlineMs,
Review Comment:
(nit) This is not public facing class. Directly change the constructor
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2167,6 +2167,73 @@ public static class MultiStageQueryRunner {
public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
= "pinot.query.runner.max.msg.size.bytes";
public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES =
16 * 1024 * 1024;
+ /**
+ * Whether the sender side of every {@code GrpcSendingMailbox} respects
gRPC client-side flow control by waiting
+ * on {@code ClientCallStreamObserver.isReady()} before pushing each chunk.
+ *
+ * <p>Default {@code true}. Set to {@code false} to restore the pre-1.6
behaviour where the sender pushes
+ * unconditionally; useful as a production kill-switch if the gate causes
an unexpected regression, and as an
+ * A/B knob for benchmarks (see {@code BenchmarkGrpcMailboxSend}).
+ *
+ * <p>Disabling this flag is what re-introduces the {@code
OutOfDirectMemoryError} failure mode the gate exists
+ * to prevent. It is here as a safety valve, not as a recommended setting.
+ */
+ public static final String KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED =
+ "pinot.query.runner.grpc.sender.backpressure.enabled";
+ public static final boolean DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED =
true;
+
+ /**
+ * Per-stream HTTP/2 flow control window, in bytes. The receiver
advertises this value to the sender as
+ * the number of bytes it will accept before requiring a `WINDOW_UPDATE`
frame. Wider windows let the
+ * sender push a whole `MseBlock` without {@link
io.grpc.stub.ClientCallStreamObserver#isReady} flipping
+ * mid-block. Applied via `NettyServerBuilder.flowControlWindow` in
`GrpcMailboxServer`.
+ *
+ * <p>This is per HTTP/2 stream, so total inbound buffering at the
receiver scales as
+ * {@code value × #concurrent streams to this server}.
+ */
+ public static final String KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES =
+ "pinot.query.runner.grpc.flow.control.window.bytes";
+ public static final int DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES = 64 * 1024
* 1024;
Review Comment:
Should we align this with `pinot.query.runner.max.msg.size.bytes` (16MB by
default)?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2167,6 +2167,73 @@ public static class MultiStageQueryRunner {
public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
= "pinot.query.runner.max.msg.size.bytes";
public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES =
16 * 1024 * 1024;
+ /**
+ * Whether the sender side of every {@code GrpcSendingMailbox} respects
gRPC client-side flow control by waiting
+ * on {@code ClientCallStreamObserver.isReady()} before pushing each chunk.
+ *
+ * <p>Default {@code true}. Set to {@code false} to restore the pre-1.6
behaviour where the sender pushes
+ * unconditionally; useful as a production kill-switch if the gate causes
an unexpected regression, and as an
+ * A/B knob for benchmarks (see {@code BenchmarkGrpcMailboxSend}).
+ *
+ * <p>Disabling this flag is what re-introduces the {@code
OutOfDirectMemoryError} failure mode the gate exists
+ * to prevent. It is here as a safety valve, not as a recommended setting.
+ */
+ public static final String KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED =
+ "pinot.query.runner.grpc.sender.backpressure.enabled";
+ public static final boolean DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED =
true;
+
+ /**
+ * Per-stream HTTP/2 flow control window, in bytes. The receiver
advertises this value to the sender as
+ * the number of bytes it will accept before requiring a `WINDOW_UPDATE`
frame. Wider windows let the
+ * sender push a whole `MseBlock` without {@link
io.grpc.stub.ClientCallStreamObserver#isReady} flipping
+ * mid-block. Applied via `NettyServerBuilder.flowControlWindow` in
`GrpcMailboxServer`.
+ *
+ * <p>This is per HTTP/2 stream, so total inbound buffering at the
receiver scales as
+ * {@code value × #concurrent streams to this server}.
+ */
+ public static final String KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES =
+ "pinot.query.runner.grpc.flow.control.window.bytes";
+ public static final int DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES = 64 * 1024
* 1024;
+
+ /**
+ * Netty per-channel WriteQueue high watermark, in bytes. Applied via
+ * `ChannelOption.WRITE_BUFFER_WATER_MARK` on the sender's
`NettyChannelBuilder`. When the channel's
+ * outbound queue exceeds this value, `Channel.isWritable()` flips to
`false` and gRPC's
+ * `ClientCallStreamObserver.isReady()` returns `false` until the queue
drops below the low watermark.
+ *
+ * <p>This is a per-channel (per `host:port`) setting, shared across all
streams to that peer. The
+ * sender's direct-memory footprint is therefore bounded by {@code value ×
#peers}, not by
+ * {@code value × #streams}. Pairs with {@link
#KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES}.
+ */
+ public static final String KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES =
+ "pinot.query.runner.grpc.write.buffer.high.water.mark.bytes";
+ public static final int DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES =
64 * 1024 * 1024;
+
+ /**
+ * Netty per-channel WriteQueue low watermark, in bytes. Once the
WriteQueue has exceeded the high
+ * watermark (see {@link
#KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES}), it must drop below this
+ * value before `Channel.isWritable()` flips back to `true`.
Conventionally set to ~50% of the high
+ * watermark.
+ */
+ public static final String KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES =
+ "pinot.query.runner.grpc.write.buffer.low.water.mark.bytes";
+ public static final int DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES =
32 * 1024 * 1024;
+
+ /**
+ * Number of inbound gRPC messages the receiver will accept in flight per
stream, before requiring the
+ * application to consume one (via {@code MailboxContentObserver.onNext}
returning). Implemented by
+ * disabling gRPC's default auto-inbound-flow-control on the server side
and calling
+ * {@code ServerCallStreamObserver.request(int)} explicitly.
+ *
+ * <p>Larger values let the sender pipeline more messages without waiting
for per-message round trips,
+ * which is the primary throughput knob for small / medium MSE blocks.
Memory exposure on the receiver
+ * is still bounded by the HTTP/2 stream window (see
+ * {@link #KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES}), so this credit count
is effectively a per-stream
+ * message-count limit on top of the byte-count limit. Whichever fires
first applies.
+ */
+ public static final String KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT =
+ "pinot.query.runner.grpc.inbound.message.credit";
+ public static final int DEFAULT_GRPC_INBOUND_MESSAGE_CREDIT = 128;
Review Comment:
Does this mean we allow 128 gRPC messages to queue up on the sender side?
How is this 128 calculated? This is way higher than the default blocking queue
size, which is 5
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2167,6 +2167,73 @@ public static class MultiStageQueryRunner {
public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
= "pinot.query.runner.max.msg.size.bytes";
public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES =
16 * 1024 * 1024;
+ /**
+ * Whether the sender side of every {@code GrpcSendingMailbox} respects
gRPC client-side flow control by waiting
+ * on {@code ClientCallStreamObserver.isReady()} before pushing each chunk.
+ *
+ * <p>Default {@code true}. Set to {@code false} to restore the pre-1.6
behaviour where the sender pushes
+ * unconditionally; useful as a production kill-switch if the gate causes
an unexpected regression, and as an
+ * A/B knob for benchmarks (see {@code BenchmarkGrpcMailboxSend}).
+ *
+ * <p>Disabling this flag is what re-introduces the {@code
OutOfDirectMemoryError} failure mode the gate exists
+ * to prevent. It is here as a safety valve, not as a recommended setting.
+ */
+ public static final String KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED =
+ "pinot.query.runner.grpc.sender.backpressure.enabled";
+ public static final boolean DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED =
true;
+
+ /**
+ * Per-stream HTTP/2 flow control window, in bytes. The receiver
advertises this value to the sender as
+ * the number of bytes it will accept before requiring a `WINDOW_UPDATE`
frame. Wider windows let the
+ * sender push a whole `MseBlock` without {@link
io.grpc.stub.ClientCallStreamObserver#isReady} flipping
+ * mid-block. Applied via `NettyServerBuilder.flowControlWindow` in
`GrpcMailboxServer`.
+ *
+ * <p>This is per HTTP/2 stream, so total inbound buffering at the
receiver scales as
+ * {@code value × #concurrent streams to this server}.
+ */
+ public static final String KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES =
+ "pinot.query.runner.grpc.flow.control.window.bytes";
+ public static final int DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES = 64 * 1024
* 1024;
+
+ /**
+ * Netty per-channel WriteQueue high watermark, in bytes. Applied via
+ * `ChannelOption.WRITE_BUFFER_WATER_MARK` on the sender's
`NettyChannelBuilder`. When the channel's
+ * outbound queue exceeds this value, `Channel.isWritable()` flips to
`false` and gRPC's
+ * `ClientCallStreamObserver.isReady()` returns `false` until the queue
drops below the low watermark.
+ *
+ * <p>This is a per-channel (per `host:port`) setting, shared across all
streams to that peer. The
+ * sender's direct-memory footprint is therefore bounded by {@code value ×
#peers}, not by
+ * {@code value × #streams}. Pairs with {@link
#KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES}.
+ */
+ public static final String KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES =
+ "pinot.query.runner.grpc.write.buffer.high.water.mark.bytes";
+ public static final int DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES =
64 * 1024 * 1024;
Review Comment:
Same here. Should we align all these sizes to the same value?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -121,11 +127,55 @@ public MailboxService(String hostname, int port,
InstanceType instanceType, Pino
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
);
- _channelManager = new ChannelManager(_clientSslContext,
_maxInboundMessageSize, getIdleTimeout(config));
+ _grpcSenderBackpressureEnabled = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED
+ );
+ int writeBufferHighWaterMarkBytes = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES);
+ int writeBufferLowWaterMarkBytes = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
+ _channelManager = new ChannelManager(_clientSslContext,
_maxInboundMessageSize, getIdleTimeout(config),
+ writeBufferHighWaterMarkBytes, writeBufferLowWaterMarkBytes);
_accessControlFactory = accessControlFactory;
+ registerMailboxClientGauges();
LOGGER.info("Initialized MailboxService with hostname: {}, port: {}",
hostname, port);
}
+ /// Registers gauges exposing the memory used by the gRPC client allocator
+ /// shared by every [GrpcSendingMailbox] this service creates. The companion
+ /// gauges for the server allocator are registered in [GrpcMailboxServer].
+ ///
+ /// Notice we are wiring the shaded gRPC Netty allocator
+ /// ([io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator]) rather
than
+ /// the non-shaded one.
+ private void registerMailboxClientGauges() {
Review Comment:
Do we want to also emit the server side memory usage?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]