Copilot commented on code in PR #17667:
URL: https://github.com/apache/pinot/pull/17667#discussion_r2782389263
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java:
##########
@@ -70,6 +70,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
if (_silentShutDown) {
return;
}
+ // Remove from _allChannels on server side so we don't leak channel
references when brokers disconnect
+ if (_allChannels != null) {
+ _allChannels.remove(ctx.channel());
+ }
Review Comment:
The early return on `_silentShutDown` prevents removing the channel from
`_allChannels`, which can still leak channel references in the silent shutdown
path. Move the `_allChannels.remove(ctx.channel())` block before the
`_silentShutDown` check (or ensure silent shutdown performs equivalent cleanup).
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java:
##########
@@ -120,6 +120,11 @@ public void start() {
PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
PooledByteBufAllocatorMetric metric = bufAllocator.metric();
ServerMetrics metrics = ServerMetrics.get();
+ // Notice here we assume there is a single QueryServer per JVM. If that
is not true (ie quickstarts with multiple
+ // servers), we:
+ // 1. Will have one allocator per server, which may cause higher memory
usage, and may cause OOM.
+ // 2. Will have multiple sets of gauges for each allocator, which means
the last one will win and override the
Review Comment:
This new comment warns about a problematic multi-server-in-one-JVM behavior,
but the code continues to create/register per `start()`. Either enforce the
intended singleton behavior (shared allocator + one-time gauge registration) or
reword the comment to avoid implying the method already mitigates the issue.
```suggestion
// Note: this method does not enforce a single QueryServer per JVM. If
multiple servers are started in the same
// JVM (e.g., quickstarts with multiple servers), each call to start()
will:
// 1. Create a separate allocator per server, which may cause higher
memory usage and may lead to OOM.
// 2. Register a separate set of gauges for each allocator, where the
last registration will win and override the
```
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java:
##########
@@ -85,6 +85,7 @@ public class ServerChannels {
private final BrokerMetrics _brokerMetrics = BrokerMetrics.get();
private final ConcurrentHashMap<ServerRoutingInstance, ServerChannel>
_serverToChannelMap = new ConcurrentHashMap<>();
+ private final PooledByteBufAllocator _bufAllocatorWithLimits;
Review Comment:
The comment explicitly calls out multi-broker-in-one-JVM scenarios, but the
implementation still constructs one allocator per `ServerChannels` instance and
re-registers global gauges (last one wins). If multi-broker JVMs are expected
(e.g., quickstarts), consider making the allocator (and gauge registration)
truly JVM-singleton (e.g., static lazy holder + one-time registration guard),
or adjust the comment to reflect the actual scope (per-ServerChannels, not
per-JVM).
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java:
##########
@@ -125,6 +126,28 @@ public ServerChannels(QueryRouter queryRouter, @Nullable
NettyConfig nettyConfig
_queryRouter = queryRouter;
_tlsConfig = tlsConfig;
_threadAccountant = threadAccountant;
+
+
+ // Notice here we assume there is a single ServerChannels per JVM. If that
is not true (ie quickstarts with multiple
+ // brokers), we:
+ // 1. Will have one allocator per broker, which may cause higher memory
usage, and may cause OOM.
+ // 2. Will have multiple sets of gauges for each allocator, which means
the last one will win and override the
+ // previous ones.
+
+ // Create a single shared allocator with limits for all channels
+ PooledByteBufAllocator defaultAllocator = PooledByteBufAllocator.DEFAULT;
+ PooledByteBufAllocatorMetric metric = defaultAllocator.metric();
+ _bufAllocatorWithLimits =
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
+ PooledByteBufAllocatorMetric bufAllocatorMetric =
_bufAllocatorWithLimits.metric();
+ // Register metrics for the shared allocator
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
bufAllocatorMetric::usedDirectMemory);
Review Comment:
The comment explicitly calls out multi-broker-in-one-JVM scenarios, but the
implementation still constructs one allocator per `ServerChannels` instance and
re-registers global gauges (last one wins). If multi-broker JVMs are expected
(e.g., quickstarts), consider making the allocator (and gauge registration)
truly JVM-singleton (e.g., static lazy holder + one-time registration guard),
or adjust the comment to reflect the actual scope (per-ServerChannels, not
per-JVM).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]