This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fb02f04516 Fix Direct Memory OOM on Server (#15335)
fb02f04516 is described below
commit fb02f045169eb34aee0587c040b078fabf803dbb
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Sat Apr 12 11:35:38 2025 -0400
Fix Direct Memory OOM on Server (#15335)
* Direct Memory on Server OOM issue fix
* Add a method in the factory class
* update license and checkstyle fixes
* change it to lower case
* Remove channels removal from QueryServer using listener
* Review comments fix
* Delete DirectOOMServerHandler
* Add license header
* Fix checkstyle changes
* checkstyle fixes
* spotless fix
* change the log message
* remove stack trace
* fixes in the arguments
* Edit comment
* Make variables final
* checkstyle changes
* checkstyle fixes
* Add Server and Broker metrics and logs to track RESERVED_MEMORY
* Add a common getReservedMemory() method
* Remove unused imports
* Remove unused imports
---
.../apache/pinot/common/metrics/ServerMeter.java | 2 +
.../core/transport/ChannelHandlerFactory.java | 7 +-
.../pinot/core/transport/DirectOOMHandler.java | 87 +++++++++++++++++-----
.../PooledByteBufAllocatorWithLimits.java | 72 ++++++++++++++++++
.../apache/pinot/core/transport/QueryServer.java | 13 +++-
.../pinot/core/transport/ServerChannels.java | 20 ++---
6 files changed, 170 insertions(+), 31 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index e3c94bda96..61270c4060 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -133,6 +133,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false),
+ DIRECT_MEMORY_OOM("directMemoryOOMCount", true),
+
// Multi-stage
/**
* Number of times the max number of rows in the hash table has been reached.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
index 00545f2607..9f46cf3fae 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ChannelHandlerFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.transport;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
@@ -103,7 +104,9 @@ public class ChannelHandlerFactory {
}
public static ChannelHandler getDirectOOMHandler(QueryRouter queryRouter,
ServerRoutingInstance serverRoutingInstance,
- ConcurrentHashMap<ServerRoutingInstance, ServerChannels.ServerChannel>
serverToChannelMap) {
- return new DirectOOMHandler(queryRouter, serverRoutingInstance,
serverToChannelMap);
+ ConcurrentHashMap<ServerRoutingInstance, ServerChannels.ServerChannel>
serverToChannelMap,
+ ConcurrentHashMap<SocketChannel, Boolean> allChannels,
ServerSocketChannel serverSocketChannel) {
+ return new DirectOOMHandler(queryRouter, serverRoutingInstance,
serverToChannelMap, allChannels,
+ serverSocketChannel);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java
index 0ba00327f6..13b89edd0b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java
@@ -20,20 +20,25 @@ package org.apache.pinot.core.transport;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
- * Handling netty direct memory OOM. In this case there is a great chance that
multiple channels are receiving
- * large data tables from servers concurrently. We want to close all channels
to servers to proactively release
- * the direct memory, because the execution of netty threads can all block in
allocating direct memory, in which case
- * no one will reach channelRead0.
+ * Handling netty direct memory OOM on broker and server. In this case there
is a great chance that multiple channels
+ * are receiving large data tables from servers concurrently. We want to close
all channels to servers to
+ * proactively release the direct memory, because the execution of netty
threads can all block in allocating direct
+ * memory, in which case no one will reach channelRead0.
*/
public class DirectOOMHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER =
LoggerFactory.getLogger(DirectOOMHandler.class);
@@ -42,12 +47,17 @@ public class DirectOOMHandler extends
ChannelInboundHandlerAdapter {
private final ServerRoutingInstance _serverRoutingInstance;
private final ConcurrentHashMap<ServerRoutingInstance,
ServerChannels.ServerChannel> _serverToChannelMap;
private volatile boolean _silentShutDown = false;
+ private final ConcurrentHashMap<SocketChannel, Boolean> _allChannels;
+ private final ServerSocketChannel _serverSocketChannel;
public DirectOOMHandler(QueryRouter queryRouter, ServerRoutingInstance
serverRoutingInstance,
- ConcurrentHashMap<ServerRoutingInstance, ServerChannels.ServerChannel>
serverToChannelMap) {
+ ConcurrentHashMap<ServerRoutingInstance, ServerChannels.ServerChannel>
serverToChannelMap,
+ ConcurrentHashMap<SocketChannel, Boolean> allChannels,
ServerSocketChannel serverSocketChannel) {
_queryRouter = queryRouter;
_serverRoutingInstance = serverRoutingInstance;
_serverToChannelMap = serverToChannelMap;
+ _allChannels = allChannels;
+ _serverSocketChannel = serverSocketChannel;
}
public void setSilentShutDown() {
@@ -63,25 +73,64 @@ public class DirectOOMHandler extends
ChannelInboundHandlerAdapter {
ctx.fireChannelInactive();
}
+ /**
+ * Closes and removes all active channels from the map to release direct
memory.
+ */
+ private void closeAllChannels() {
+ LOGGER.warn("OOM detected: Closing all channels to server to release
direct memory");
+ for (SocketChannel channel : _allChannels.keySet()) {
+ try {
+ if (channel != null) {
+ LOGGER.info("Closing channel: {}", channel);
+ setSilentShutdown(channel);
+ channel.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error while closing channel: {}", channel, e);
+ } finally {
+ if (channel != null) {
+ _allChannels.remove(channel);
+ }
+ }
+ }
+ }
+
+ // silent shutdown for the channels without firing channelInactive
+ private void setSilentShutdown(SocketChannel socketChannel) {
+ if (socketChannel != null) {
+ DirectOOMHandler directOOMHandler =
socketChannel.pipeline().get(DirectOOMHandler.class);
+ if (directOOMHandler != null) {
+ directOOMHandler.setSilentShutDown();
+ }
+ }
+ }
+
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// catch direct memory oom here
- if (cause instanceof OutOfMemoryError
- && StringUtils.containsIgnoreCase(cause.getMessage(), "direct
buffer")) {
- BrokerMetrics.get().addMeteredGlobalValue(BrokerMeter.DIRECT_MEMORY_OOM,
1L);
+ if (cause instanceof OutOfMemoryError &&
StringUtils.containsIgnoreCase(cause.getMessage(), "direct buffer")) {
// only one thread can get here and do the shutdown
if (DIRECT_OOM_SHUTTING_DOWN.compareAndSet(false, true)) {
try {
- LOGGER.error("Closing ALL channels to servers, as we are running out
of direct memory "
- + "while receiving response from {}", _serverRoutingInstance,
cause);
- // close all channels to servers
- _serverToChannelMap.keySet().forEach(serverRoutingInstance -> {
- ServerChannels.ServerChannel removed =
_serverToChannelMap.remove(serverRoutingInstance);
- removed.closeChannel();
- removed.setSilentShutdown();
- });
- _queryRouter.markServerDown(_serverRoutingInstance,
- new QueryCancelledException("Query cancelled as broker is out of
direct memory"));
+ if (_serverToChannelMap != null && !_serverToChannelMap.isEmpty()) {
+ LOGGER.error("Closing ALL channels to servers, as we are running
out of direct memory "
+ + "while receiving response from {}", _serverRoutingInstance,
cause); // broker side direct OOM handler
+
BrokerMetrics.get().addMeteredGlobalValue(BrokerMeter.DIRECT_MEMORY_OOM, 1L);
+
+ // close all channels to servers
+ _serverToChannelMap.keySet().forEach(serverRoutingInstance -> {
+ ServerChannels.ServerChannel removed =
_serverToChannelMap.remove(serverRoutingInstance);
+ removed.closeChannel();
+ removed.setSilentShutdown();
+ });
+ _queryRouter.markServerDown(_serverRoutingInstance,
+ new QueryCancelledException("Query cancelled as broker is out
of direct memory"));
+ } else if (_allChannels != null && !_allChannels.isEmpty()) { //
server side direct OOM handler
+ LOGGER.error("Closing channel from broker, as we are running out
of direct memory "
+ + "while initiating request to server channel {}",
_serverSocketChannel, cause);
+
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.DIRECT_MEMORY_OOM, 1L);
+ closeAllChannels();
+ }
} catch (Exception e) {
LOGGER.error("Caught exception while handling direct memory OOM", e);
} finally {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
new file mode 100644
index 0000000000..54ab85dbeb
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+import io.grpc.netty.shaded.io.netty.util.internal.SystemPropertyUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
+import io.netty.util.NettyRuntime;
+import io.netty.util.internal.PlatformDependent;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for setting limits in the PooledByteBufAllocator.
+ */
+public class PooledByteBufAllocatorWithLimits {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PooledByteBufAllocatorWithLimits.class);
+
+ private PooledByteBufAllocatorWithLimits() {
+ }
+
+ // Reduce the number of direct arenas when using netty channels on broker
and server side to limit the direct
+ // memory usage
+ public static PooledByteBufAllocator
getBufferAllocatorWithLimits(PooledByteBufAllocatorMetric metric) {
+ int defaultPageSize =
SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
+ final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
+ int defaultMaxOrder =
SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 9);
+ final int defaultChunkSize = defaultPageSize << defaultMaxOrder;
+ long maxDirectMemory = PlatformDependent.maxDirectMemory();
+ long remainingDirectMemory = maxDirectMemory - getReservedMemory();
+
+ int numDirectArenas = Math.max(0,
SystemPropertyUtil.getInt("io.netty.allocator.numDirectArenas",
+ (int) Math.min(defaultMinNumArena, remainingDirectMemory /
defaultChunkSize / 5)));
+ boolean useCacheForAllThreads =
SystemPropertyUtil.getBoolean("io.netty.allocator.useCacheForAllThreads",
false);
+
+ return new PooledByteBufAllocator(true, metric.numHeapArenas(),
numDirectArenas, defaultPageSize, defaultMaxOrder,
+ metric.smallCacheSize(), metric.normalCacheSize(),
useCacheForAllThreads);
+ }
+
+ //Get reserved direct memory allocated so far
+ private static long getReservedMemory() {
+ try {
+ Class<?> bitsClass = Class.forName("java.nio.Bits");
+ Field reservedMemoryField =
bitsClass.getDeclaredField("RESERVED_MEMORY");
+ reservedMemoryField.setAccessible(true);
+ AtomicLong reserved = (AtomicLong) reservedMemoryField.get(null);
+ return reserved.get();
+ } catch (Exception e) {
+ LOGGER.error("Failed to get the direct reserved memory");
+ return 0;
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 0e1c7d1aa0..f7e51c2540 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -36,6 +36,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
@@ -60,6 +61,8 @@ public class QueryServer {
private final Class<? extends ServerSocketChannel> _channelClass;
private final ChannelHandler _instanceRequestHandler;
private ServerSocketChannel _channel;
+ private final ConcurrentHashMap<SocketChannel, Boolean> _allChannels = new
ConcurrentHashMap<>();
+
/**
* Create an unsecured server instance
@@ -116,6 +119,9 @@ public class QueryServer {
PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
PooledByteBufAllocatorMetric metric = bufAllocator.metric();
ServerMetrics metrics = ServerMetrics.get();
+ PooledByteBufAllocator bufAllocatorWithLimits =
+
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
+ metric = bufAllocatorWithLimits.metric();
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
metric::usedDirectMemory);
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
metric::usedHeapMemory);
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_ARENAS_DIRECT,
metric::numDirectArenas);
@@ -126,9 +132,14 @@ public class QueryServer {
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CHUNK_SIZE,
metric::chunkSize);
_channel = (ServerSocketChannel) serverBootstrap.group(_bossGroup,
_workerGroup).channel(_channelClass)
.option(ChannelOption.SO_BACKLOG,
128).childOption(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.ALLOCATOR, bufAllocator).childHandler(new
ChannelInitializer<SocketChannel>() {
+ .option(ChannelOption.ALLOCATOR, bufAllocatorWithLimits)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
+ _allChannels.put(ch, true);
+
+ ch.pipeline()
+ .addLast(ChannelHandlerFactory.getDirectOOMHandler(null,
null, null, _allChannels, _channel));
if (_tlsConfig != null) {
// Add SSL handler first to encrypt and decrypt everything.
ch.pipeline()
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index b1e0d8c60b..451f4f887b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -159,6 +159,9 @@ public class ServerChannels {
_serverRoutingInstance = serverRoutingInstance;
PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+ PooledByteBufAllocator bufAllocatorWithLimits =
+
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
+ metric = bufAllocatorWithLimits.metric();
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
metric::usedDirectMemory);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
metric::usedHeapMemory);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT,
metric::numDirectArenas);
@@ -169,26 +172,25 @@ public class ServerChannels {
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE,
metric::chunkSize);
_bootstrap = new
Bootstrap().remoteAddress(serverRoutingInstance.getHostname(),
serverRoutingInstance.getPort())
- .option(ChannelOption.ALLOCATOR, bufAllocator)
-
.group(_eventLoopGroup).channel(_channelClass).option(ChannelOption.SO_KEEPALIVE,
true)
- .handler(new ChannelInitializer<SocketChannel>() {
+ .option(ChannelOption.ALLOCATOR,
bufAllocatorWithLimits).group(_eventLoopGroup).channel(_channelClass)
+ .option(ChannelOption.SO_KEEPALIVE, true).handler(new
ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
if (_tlsConfig != null) {
// Add SSL handler first to encrypt and decrypt everything.
- ch.pipeline().addLast(
- ChannelHandlerFactory.SSL,
ChannelHandlerFactory.getClientTlsHandler(_tlsConfig, ch));
+ ch.pipeline()
+ .addLast(ChannelHandlerFactory.SSL,
ChannelHandlerFactory.getClientTlsHandler(_tlsConfig, ch));
}
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldBasedFrameDecoder());
ch.pipeline().addLast(ChannelHandlerFactory.getLengthFieldPrepender());
ch.pipeline().addLast(
- ChannelHandlerFactory.getDirectOOMHandler(_queryRouter,
_serverRoutingInstance, _serverToChannelMap)
- );
+ ChannelHandlerFactory.getDirectOOMHandler(_queryRouter,
_serverRoutingInstance, _serverToChannelMap,
+ null, null));
// NOTE: data table de-serialization happens inside this handler
// Revisit if this becomes a bottleneck
- ch.pipeline().addLast(ChannelHandlerFactory
- .getDataTableHandler(_queryRouter,
_serverRoutingInstance, _brokerMetrics));
+ ch.pipeline().addLast(
+ ChannelHandlerFactory.getDataTableHandler(_queryRouter,
_serverRoutingInstance, _brokerMetrics));
}
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]