This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 545210b8e0e Share a single pooled ByteBuf allocator across Netty query 
transports instead of one per server channel (#18905)
545210b8e0e is described below

commit 545210b8e0e660bb2619dfe951838ec325771680
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Jul 2 09:12:37 2026 -0700

    Share a single pooled ByteBuf allocator across Netty query transports 
instead of one per server channel (#18905)
---
 .../PooledByteBufAllocatorWithLimits.java          | 37 ++++++++++++++++++++--
 .../apache/pinot/core/transport/QueryServer.java   | 14 +++++---
 .../pinot/core/transport/ServerChannels.java       | 28 ++++++++--------
 .../pinot/core/transport/QueryServerTest.java      | 11 +++++++
 .../pinot/core/transport/ServerChannelsTest.java   | 31 ++++++++++++++++++
 5 files changed, 98 insertions(+), 23 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
index 54ab85dbeb8..14254d6932e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
@@ -30,17 +30,42 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Utility class for setting limits in the PooledByteBufAllocator.
+ * Utility class that creates a {@link PooledByteBufAllocator} with a reduced 
number of direct arenas to limit the
+ * direct memory retained by the pool, and owns the process-wide shared 
instance of it. Thread-safe.
  */
 public class PooledByteBufAllocatorWithLimits {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PooledByteBufAllocatorWithLimits.class);
+  private static volatile PooledByteBufAllocator 
_sharedBufferAllocatorWithLimits;
 
   private PooledByteBufAllocatorWithLimits() {
   }
 
+  /**
+   * Returns the shared allocator, creating it on first use. All unshaded 
Netty query transports within the process
+   * (all broker side {@link ServerChannels} and the server side {@link 
QueryServer}) must share this single
+   * allocator: pooled arenas retain chunk memory after the buffers allocated 
from them are released, and free space
+   * in one allocator's pool can never serve another allocator's allocations, 
so per-connection allocators can retain
+   * many times the intended amount of direct memory and exhaust it. Note that 
the reduced arena count limits the
+   * worst case retention but is not a hard cap on direct memory usage. The 
gRPC based transports use shaded Netty
+   * classes and maintain their own allocators.
+   */
+  public static PooledByteBufAllocator getSharedBufferAllocatorWithLimits() {
+    PooledByteBufAllocator sharedAllocator = _sharedBufferAllocatorWithLimits;
+    if (sharedAllocator == null) {
+      synchronized (PooledByteBufAllocatorWithLimits.class) {
+        sharedAllocator = _sharedBufferAllocatorWithLimits;
+        if (sharedAllocator == null) {
+          sharedAllocator = 
getBufferAllocatorWithLimits(PooledByteBufAllocator.DEFAULT.metric());
+          _sharedBufferAllocatorWithLimits = sharedAllocator;
+        }
+      }
+    }
+    return sharedAllocator;
+  }
+
   // Reduce the number of direct arenas when using netty channels on broker 
and server side to limit the direct
   // memory usage
-  public static PooledByteBufAllocator 
getBufferAllocatorWithLimits(PooledByteBufAllocatorMetric metric) {
+  private static PooledByteBufAllocator 
getBufferAllocatorWithLimits(PooledByteBufAllocatorMetric metric) {
     int defaultPageSize = 
SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
     final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
     int defaultMaxOrder = 
SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 9);
@@ -48,10 +73,16 @@ public class PooledByteBufAllocatorWithLimits {
     long maxDirectMemory = PlatformDependent.maxDirectMemory();
     long remainingDirectMemory = maxDirectMemory - getReservedMemory();
 
+    // Floor the default at 1: this allocator is created once and shared for 
the lifetime of the process, so a
+    // depleted direct memory snapshot at creation time must not permanently 
disable pooling. An explicit
+    // io.netty.allocator.numDirectArenas=0 still disables direct arenas.
     int numDirectArenas = Math.max(0, 
SystemPropertyUtil.getInt("io.netty.allocator.numDirectArenas",
-        (int) Math.min(defaultMinNumArena, remainingDirectMemory / 
defaultChunkSize / 5)));
+        (int) Math.max(1, Math.min(defaultMinNumArena, remainingDirectMemory / 
defaultChunkSize / 5))));
     boolean useCacheForAllThreads = 
SystemPropertyUtil.getBoolean("io.netty.allocator.useCacheForAllThreads", 
false);
 
+    LOGGER.info("Creating PooledByteBufAllocator with numDirectArenas: {}, 
numHeapArenas: {}, chunkSize: {}, "
+            + "remainingDirectMemory: {}", numDirectArenas, 
metric.numHeapArenas(), defaultChunkSize,
+        remainingDirectMemory);
     return new PooledByteBufAllocator(true, metric.numHeapArenas(), 
numDirectArenas, defaultPageSize, defaultMaxOrder,
         metric.smallCacheSize(), metric.normalCacheSize(), 
useCacheForAllThreads);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 2f2ca7e2b7d..25e1fbcf117 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -37,6 +37,7 @@ import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.util.internal.PlatformDependent;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.config.NettyConfig;
@@ -117,12 +118,10 @@ public class QueryServer {
     try {
       ServerBootstrap serverBootstrap = new ServerBootstrap();
 
-      PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
-      PooledByteBufAllocatorMetric metric = bufAllocator.metric();
-      ServerMetrics metrics = ServerMetrics.get();
       PooledByteBufAllocator bufAllocatorWithLimits =
-          
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
-      metric = bufAllocatorWithLimits.metric();
+          
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits();
+      PooledByteBufAllocatorMetric metric = bufAllocatorWithLimits.metric();
+      ServerMetrics metrics = ServerMetrics.get();
       
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_DIRECT_MEMORY, 
metric::usedDirectMemory);
       
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_HEAP_MEMORY, 
metric::usedHeapMemory);
       metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_ARENAS_DIRECT, 
metric::numDirectArenas);
@@ -186,4 +185,9 @@ public class QueryServer {
   int getConnectedChannelCount() {
     return _allChannels.size();
   }
+
+  @VisibleForTesting
+  Set<SocketChannel> getConnectedChannels() {
+    return _allChannels.keySet();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index 7f0b091c32c..4bc3fb39f8a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -83,6 +83,7 @@ public class ServerChannels {
   private final EventLoopGroup _eventLoopGroup;
   private final Class<? extends SocketChannel> _channelClass;
   private final ThreadAccountant _threadAccountant;
+  private final PooledByteBufAllocator _bufAllocatorWithLimits;
 
   private final BrokerMetrics _brokerMetrics = BrokerMetrics.get();
   private final ConcurrentHashMap<ServerRoutingInstance, ServerChannel> 
_serverToChannelMap = new ConcurrentHashMap<>();
@@ -126,6 +127,17 @@ public class ServerChannels {
     _queryRouter = queryRouter;
     _tlsConfig = tlsConfig;
     _threadAccountant = threadAccountant;
+
+    _bufAllocatorWithLimits = 
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits();
+    PooledByteBufAllocatorMetric metric = _bufAllocatorWithLimits.metric();
+    
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
 metric::usedDirectMemory);
+    
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
 metric::usedHeapMemory);
+    
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT, 
metric::numDirectArenas);
+    
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_HEAP, 
metric::numHeapArenas);
+    
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_SMALL,
 metric::smallCacheSize);
+    
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
+    
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
+    _brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
   }
 
   public void sendRequest(String rawTableName, AsyncQueryResponse 
asyncQueryResponse,
@@ -165,22 +177,8 @@ public class ServerChannels {
 
     ServerChannel(ServerRoutingInstance serverRoutingInstance) {
       _serverRoutingInstance = serverRoutingInstance;
-      PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
-      PooledByteBufAllocatorMetric metric = bufAllocator.metric();
-      PooledByteBufAllocator bufAllocatorWithLimits =
-          
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
-      metric = bufAllocatorWithLimits.metric();
-      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
 metric::usedDirectMemory);
-      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
 metric::usedHeapMemory);
-      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT, 
metric::numDirectArenas);
-      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_HEAP, 
metric::numHeapArenas);
-      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_SMALL,
 metric::smallCacheSize);
-      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
-      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
-      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
-
       _bootstrap = new 
Bootstrap().remoteAddress(serverRoutingInstance.getHostname(), 
serverRoutingInstance.getPort())
-          .option(ChannelOption.ALLOCATOR, 
bufAllocatorWithLimits).group(_eventLoopGroup).channel(_channelClass)
+          .option(ChannelOption.ALLOCATOR, 
_bufAllocatorWithLimits).group(_eventLoopGroup).channel(_channelClass)
           .option(ChannelOption.SO_KEEPALIVE, true).handler(new 
ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
index df72863cd0c..d128737ac10 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.transport;
 
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.socket.SocketChannel;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import org.apache.commons.io.IOUtils;
@@ -34,6 +35,7 @@ import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 
 
@@ -58,6 +60,10 @@ public class QueryServerTest {
     QueryServer server = new QueryServer(0, nettyConfig, tlsConfig, 
channelHandler);
     server.start();
 
+    // The server should use the shared process-wide bounded allocator
+    assertSame(server.getChannel().config().getAllocator(),
+        PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());
+
     final InetSocketAddress serverAddress = server.getChannel().localAddress();
 
     assertTrue(connectionOk(serverAddress));
@@ -82,6 +88,11 @@ public class QueryServerTest {
       TestUtils.waitForCondition(aVoid -> server.getConnectedChannelCount() > 
0, 5_000L,
           "Channel was not registered in _allChannels");
 
+      // The accepted child channels (which allocate the request/response 
buffers) must also use the shared allocator
+      SocketChannel connectedChannel = 
server.getConnectedChannels().iterator().next();
+      assertSame(connectedChannel.config().getAllocator(),
+          
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());
+
       socket.close();
 
       TestUtils.waitForCondition(aVoid -> server.getConnectedChannelCount() == 
0, 5_000L,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
index 8d5aabbfc15..0dd1bbff383 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.core.transport;
 
 import com.sun.net.httpserver.HttpServer;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
 import io.netty.util.concurrent.GenericFutureListener;
 import java.net.InetSocketAddress;
 import org.apache.pinot.common.config.NettyConfig;
@@ -42,6 +44,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
 
 
 public class ServerChannelsTest {
@@ -91,6 +95,33 @@ public class ServerChannelsTest {
     }
   }
 
+  @Test
+  public void testChannelsShareBufferAllocator() {
+    ServerChannels serverChannels =
+        new ServerChannels(mock(QueryRouter.class), null, null, 
ThreadAccountantUtils.getNoOpAccountant());
+    ServerChannels otherServerChannels =
+        new ServerChannels(mock(QueryRouter.class), null, null, 
ThreadAccountantUtils.getNoOpAccountant());
+    try {
+      ByteBufAllocator allocator = getBootstrapAllocator(
+          serverChannels.getOrCreateServerChannel(new 
ServerRoutingInstance("localhost", 12345, TableType.OFFLINE)));
+      assertNotNull(allocator);
+      assertSame(allocator, 
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());
+      // All channels created by a ServerChannels use the same allocator
+      assertSame(getBootstrapAllocator(serverChannels.getOrCreateServerChannel(
+          new ServerRoutingInstance("localhost", 12346, TableType.REALTIME))), 
allocator);
+      // Channels created by another ServerChannels instance (e.g. the TLS 
one) share it as well
+      
assertSame(getBootstrapAllocator(otherServerChannels.getOrCreateServerChannel(
+          new ServerRoutingInstance("localhost", 12347, TableType.OFFLINE))), 
allocator);
+    } finally {
+      serverChannels.shutDown();
+      otherServerChannels.shutDown();
+    }
+  }
+
+  private static ByteBufAllocator 
getBootstrapAllocator(ServerChannels.ServerChannel serverChannel) {
+    return (ByteBufAllocator) 
serverChannel._bootstrap.config().options().get(ChannelOption.ALLOCATOR);
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testWriteFailureClosesChannelAndFailsQuery() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to