This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.17 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 7b319338b0e6327fbc55cac1f9d3940478b0982b Author: Yan Zhao <horizo...@apache.org> AuthorDate: Mon Jul 15 14:25:46 2024 +0800 Allocator support exitOnOutOfMemory config. (#3984) * Allocator support exitOnOutOfMemory config. (cherry picked from commit 15b106c7610f11dfc091c8f73c28d1666113c25f) --- bookkeeper-common-allocator/pom.xml | 5 ++ .../common/allocator/ByteBufAllocatorBuilder.java | 2 + .../impl/ByteBufAllocatorBuilderImpl.java | 9 ++- .../allocator/impl/ByteBufAllocatorImpl.java | 34 +++++++-- .../bookkeeper/common/util/ShutdownUtil.java | 86 ++++++++++++++++++++++ .../bookkeeper/common/util/package-info.java | 21 ++++++ .../impl/ByteBufAllocatorBuilderTest.java | 28 +++++++ .../apache/bookkeeper/bookie/BookieResources.java | 1 + .../org/apache/bookkeeper/client/BookKeeper.java | 1 + .../bookkeeper/conf/AbstractConfiguration.java | 10 +++ .../bookkeeper/conf/AbstractConfigurationTest.java | 10 +++ .../tools/perf/journal/JournalWriter.java | 1 + 12 files changed, 201 insertions(+), 7 deletions(-) diff --git a/bookkeeper-common-allocator/pom.xml b/bookkeeper-common-allocator/pom.xml index 9712799f5d..c55200aa0f 100644 --- a/bookkeeper-common-allocator/pom.xml +++ b/bookkeeper-common-allocator/pom.xml @@ -29,6 +29,11 @@ <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java index 3e36a23d17..cb244140b3 100644 --- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java +++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java @@ -92,4 +92,6 @@ public interface ByteBufAllocatorBuilder { * <p>Default is {@link LeakDetectionPolicy#Disabled} */ ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy); + + ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory); } diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java index 69c57232af..4b5469a3f7 100644 --- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java +++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java @@ -37,11 +37,12 @@ public class ByteBufAllocatorBuilderImpl implements ByteBufAllocatorBuilder { OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap; Consumer<OutOfMemoryError> outOfMemoryListener = null; LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled; + boolean exitOnOutOfMemory = false; @Override public ByteBufAllocatorWithOomHandler build() { return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency, - outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy); + outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy, exitOnOutOfMemory); } @Override @@ -86,4 +87,10 @@ public class ByteBufAllocatorBuilderImpl implements ByteBufAllocatorBuilder { return this; } + @Override + public ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory) { + this.exitOnOutOfMemory = exitOnOutOfMemory; + return this; + } + } diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java index 87582cca92..3bc06f8e7e 100644 --- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java +++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java @@ -29,6 +29,7 @@ import org.apache.bookkeeper.common.allocator.ByteBufAllocatorWithOomHandler; import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy; import org.apache.bookkeeper.common.allocator.PoolingPolicy; +import org.apache.bookkeeper.common.util.ShutdownUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,15 +49,24 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By private final PoolingPolicy poolingPolicy; private final OutOfMemoryPolicy outOfMemoryPolicy; private Consumer<OutOfMemoryError> outOfMemoryListener; + private final boolean exitOnOutOfMemory; ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator, PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy, Consumer<OutOfMemoryError> outOfMemoryListener, LeakDetectionPolicy leakDetectionPolicy) { - super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */); + this(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency, outOfMemoryPolicy, + outOfMemoryListener, leakDetectionPolicy, false); + } + ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator, + PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy, + Consumer<OutOfMemoryError> outOfMemoryListener, + LeakDetectionPolicy leakDetectionPolicy, boolean exitOnOutOfMemory) { + super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */); this.poolingPolicy = poolingPolicy; this.outOfMemoryPolicy = outOfMemoryPolicy; + this.exitOnOutOfMemory = exitOnOutOfMemory; if (outOfMemoryListener == null) { this.outOfMemoryListener = (v) -> { log.error("Unable to allocate memory", v); @@ -146,7 +156,7 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By : unpooledAllocator; return alloc.heapBuffer(initialCapacity, maxCapacity); } catch (OutOfMemoryError e) { - outOfMemoryListener.accept(e); + consumeOOMError(e); throw e; } } @@ -166,12 +176,12 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By try { return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity); } catch (OutOfMemoryError e2) { - outOfMemoryListener.accept(e2); + consumeOOMError(e2); throw e2; } } else { // ThrowException - outOfMemoryListener.accept(e); + consumeOOMError(e); throw e; } } @@ -181,12 +191,24 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By try { return unpooledAllocator.directBuffer(initialCapacity, maxCapacity); } catch (OutOfMemoryError e) { - outOfMemoryListener.accept(e); - throw e; + consumeOOMError(e); + throw e; } } } + private void consumeOOMError(OutOfMemoryError outOfMemoryError) { + try { + outOfMemoryListener.accept(outOfMemoryError); + } catch (Throwable e) { + log.warn("Consume outOfMemory error failed.", e); + } + if (exitOnOutOfMemory) { + log.info("Exiting JVM process for OOM error: {}", outOfMemoryError.getMessage(), outOfMemoryError); + ShutdownUtil.triggerImmediateForcefulShutdown(); + } + } + @Override public boolean isDirectBufferPooled() { return pooledAllocator != null && pooledAllocator.isDirectBufferPooled(); diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java new file mode 100644 index 0000000000..a398b57fe7 --- /dev/null +++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.common.util; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import lombok.extern.slf4j.Slf4j; + +/** + * Forked from <a href="https://github.com/apache/pulsar">Pulsar</a>. + */ +@Slf4j +public class ShutdownUtil { + private static final Method log4j2ShutdownMethod; + + static { + // use reflection to find org.apache.logging.log4j.LogManager.shutdown method + Method shutdownMethod = null; + try { + shutdownMethod = Class.forName("org.apache.logging.log4j.LogManager") + .getMethod("shutdown"); + } catch (ClassNotFoundException | NoSuchMethodException e) { + // ignore when Log4j2 isn't found, log at debug level + log.debug("Cannot find org.apache.logging.log4j.LogManager.shutdown method", e); + } + log4j2ShutdownMethod = shutdownMethod; + } + + /** + * Triggers an immediate forceful shutdown of the current process. + * + * @param status Termination status. By convention, a nonzero status code indicates abnormal termination. + * @see Runtime#halt(int) + */ + public static void triggerImmediateForcefulShutdown(int status) { + triggerImmediateForcefulShutdown(status, true); + } + public static void triggerImmediateForcefulShutdown(int status, boolean logging) { + try { + if (status != 0 && logging) { + log.warn("Triggering immediate shutdown of current process with status {}", status, + new Exception("Stacktrace for immediate shutdown")); + } + shutdownLogging(); + } finally { + Runtime.getRuntime().halt(status); + } + } + + private static void shutdownLogging() { + // flush log buffers and shutdown log4j2 logging to prevent log truncation + if (log4j2ShutdownMethod != null) { + try { + // use reflection to call org.apache.logging.log4j.LogManager.shutdown() + log4j2ShutdownMethod.invoke(null); + } catch (IllegalAccessException | InvocationTargetException e) { + log.error("Unable to call org.apache.logging.log4j.LogManager.shutdown using reflection.", e); + } + } + } + + /** + * Triggers an immediate forceful shutdown of the current process using 1 as the status code. + * + * @see Runtime#halt(int) + */ + public static void triggerImmediateForcefulShutdown() { + triggerImmediateForcefulShutdown(1); + } +} \ No newline at end of file diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java new file mode 100644 index 0000000000..55031dd8f8 --- /dev/null +++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * defines the utilities for allocator used across the project. + */ +package org.apache.bookkeeper.common.util; \ No newline at end of file diff --git a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java index 6f2538d6c8..40c41fa65b 100644 --- a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java +++ b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; @@ -35,7 +36,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder; import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy; import org.apache.bookkeeper.common.allocator.PoolingPolicy; +import org.apache.bookkeeper.common.util.ShutdownUtil; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; /** * Tests for {@link ByteBufAllocatorBuilderImpl}. @@ -87,6 +91,30 @@ public class ByteBufAllocatorBuilderTest { assertEquals(outOfDirectMemException, receivedException.get()); } + @Test() + public void testOomExit() { + ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class); + when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException); + + ByteBufAllocator alloc = ByteBufAllocatorBuilder.create() + .pooledAllocator(baseAlloc) + .outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException) + .exitOnOutOfMemory(true) + .build(); + + MockedStatic<ShutdownUtil> mockedStatic = mockStatic(ShutdownUtil.class); + + try { + alloc.buffer(); + fail("Should have thrown exception"); + } catch (OutOfMemoryError e) { + // Expected + assertEquals(outOfDirectMemException, e); + } + + mockedStatic.verify(() -> ShutdownUtil.triggerImmediateForcefulShutdown(), Mockito.times(1)); + } + @Test public void testOomWithFallback() { ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java index c9b71b9968..755efd5be0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java @@ -70,6 +70,7 @@ public class BookieResources { .poolingConcurrency(conf.getAllocatorPoolingConcurrency()) .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy()) .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy()) + .exitOnOutOfMemory(conf.exitOnOutOfMemory()) .build(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index d7043dc8c9..0362aadcba 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -478,6 +478,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { .poolingConcurrency(conf.getAllocatorPoolingConcurrency()) .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy()) .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy()) + .exitOnOutOfMemory(conf.exitOnOutOfMemory()) .build(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java index 438dc40983..6d2a82bb55 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java @@ -183,6 +183,7 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration> protected static final String ALLOCATOR_POOLING_CONCURRENCY = "allocatorPoolingConcurrency"; protected static final String ALLOCATOR_OOM_POLICY = "allocatorOutOfMemoryPolicy"; protected static final String ALLOCATOR_LEAK_DETECTION_POLICY = "allocatorLeakDetectionPolicy"; + protected static final String ALLOCATOR_EXIT_ON_OUT_OF_MEMORY = "allocatorExitOnOutOfMemory"; // option to limit stats logging public static final String LIMIT_STATS_LOGGING = "limitStatsLogging"; @@ -1156,6 +1157,15 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration> return getThis(); } + public T setExitOnOutOfMemory(boolean exitOnOutOfMemory) { + this.setProperty(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, exitOnOutOfMemory); + return getThis(); + } + + public boolean exitOnOutOfMemory() { + return getBoolean(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, false); + } + /** * Return whether the busy-wait is enabled for BookKeeper and Netty IO threads. * diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java index a6333a47d3..194ab2c68d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.conf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.mock; @@ -179,4 +181,12 @@ public class AbstractConfigurationTest { System.getProperties().put(nettyLevelKey, nettyLevelStr); } } + + @Test + public void testExitOnOutOfMemory() { + assertFalse(conf.exitOnOutOfMemory()); + conf.setExitOnOutOfMemory(true); + assertTrue(conf.exitOnOutOfMemory()); + } + } diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java index 383ccfb982..e287fbd94a 100644 --- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java +++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java @@ -495,6 +495,7 @@ public class JournalWriter implements Runnable { log.error("Unable to allocate memory, exiting bookie", ex); }) .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy()) + .exitOnOutOfMemory(conf.exitOnOutOfMemory()) .build(); }