This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 7b319338b0e6327fbc55cac1f9d3940478b0982b
Author: Yan Zhao <horizo...@apache.org>
AuthorDate: Mon Jul 15 14:25:46 2024 +0800

    Allocator support exitOnOutOfMemory config. (#3984)
    
    * Allocator support exitOnOutOfMemory config.
    
    (cherry picked from commit 15b106c7610f11dfc091c8f73c28d1666113c25f)
---
 bookkeeper-common-allocator/pom.xml                |  5 ++
 .../common/allocator/ByteBufAllocatorBuilder.java  |  2 +
 .../impl/ByteBufAllocatorBuilderImpl.java          |  9 ++-
 .../allocator/impl/ByteBufAllocatorImpl.java       | 34 +++++++--
 .../bookkeeper/common/util/ShutdownUtil.java       | 86 ++++++++++++++++++++++
 .../bookkeeper/common/util/package-info.java       | 21 ++++++
 .../impl/ByteBufAllocatorBuilderTest.java          | 28 +++++++
 .../apache/bookkeeper/bookie/BookieResources.java  |  1 +
 .../org/apache/bookkeeper/client/BookKeeper.java   |  1 +
 .../bookkeeper/conf/AbstractConfiguration.java     | 10 +++
 .../bookkeeper/conf/AbstractConfigurationTest.java | 10 +++
 .../tools/perf/journal/JournalWriter.java          |  1 +
 12 files changed, 201 insertions(+), 7 deletions(-)

diff --git a/bookkeeper-common-allocator/pom.xml 
b/bookkeeper-common-allocator/pom.xml
index 9712799f5d..c55200aa0f 100644
--- a/bookkeeper-common-allocator/pom.xml
+++ b/bookkeeper-common-allocator/pom.xml
@@ -29,6 +29,11 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
index 3e36a23d17..cb244140b3 100644
--- 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
@@ -92,4 +92,6 @@ public interface ByteBufAllocatorBuilder {
      * <p>Default is {@link LeakDetectionPolicy#Disabled}
      */
     ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy 
leakDetectionPolicy);
+
+    ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory);
 }
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
index 69c57232af..4b5469a3f7 100644
--- 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
@@ -37,11 +37,12 @@ public class ByteBufAllocatorBuilderImpl implements 
ByteBufAllocatorBuilder {
     OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap;
     Consumer<OutOfMemoryError> outOfMemoryListener = null;
     LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled;
+    boolean exitOnOutOfMemory = false;
 
     @Override
     public ByteBufAllocatorWithOomHandler build() {
         return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator, 
poolingPolicy, poolingConcurrency,
-                outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy);
+                outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy, 
exitOnOutOfMemory);
     }
 
     @Override
@@ -86,4 +87,10 @@ public class ByteBufAllocatorBuilderImpl implements 
ByteBufAllocatorBuilder {
         return this;
     }
 
+    @Override
+    public ByteBufAllocatorBuilder exitOnOutOfMemory(boolean 
exitOnOutOfMemory) {
+        this.exitOnOutOfMemory = exitOnOutOfMemory;
+        return this;
+    }
+
 }
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
index 87582cca92..3bc06f8e7e 100644
--- 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.bookkeeper.common.allocator.ByteBufAllocatorWithOomHandler;
 import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
 import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.common.util.ShutdownUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,15 +49,24 @@ public class ByteBufAllocatorImpl extends 
AbstractByteBufAllocator implements By
     private final PoolingPolicy poolingPolicy;
     private final OutOfMemoryPolicy outOfMemoryPolicy;
     private Consumer<OutOfMemoryError> outOfMemoryListener;
+    private final boolean exitOnOutOfMemory;
 
     ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator 
unpooledAllocator,
             PoolingPolicy poolingPolicy, int poolingConcurrency, 
OutOfMemoryPolicy outOfMemoryPolicy,
             Consumer<OutOfMemoryError> outOfMemoryListener,
             LeakDetectionPolicy leakDetectionPolicy) {
-        super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
+        this(pooledAllocator, unpooledAllocator, poolingPolicy, 
poolingConcurrency, outOfMemoryPolicy,
+                outOfMemoryListener, leakDetectionPolicy, false);
+    }
 
+    ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator 
unpooledAllocator,
+            PoolingPolicy poolingPolicy, int poolingConcurrency, 
OutOfMemoryPolicy outOfMemoryPolicy,
+            Consumer<OutOfMemoryError> outOfMemoryListener,
+            LeakDetectionPolicy leakDetectionPolicy, boolean 
exitOnOutOfMemory) {
+        super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
         this.poolingPolicy = poolingPolicy;
         this.outOfMemoryPolicy = outOfMemoryPolicy;
+        this.exitOnOutOfMemory = exitOnOutOfMemory;
         if (outOfMemoryListener == null) {
             this.outOfMemoryListener = (v) -> {
                 log.error("Unable to allocate memory", v);
@@ -146,7 +156,7 @@ public class ByteBufAllocatorImpl extends 
AbstractByteBufAllocator implements By
                     : unpooledAllocator;
             return alloc.heapBuffer(initialCapacity, maxCapacity);
         } catch (OutOfMemoryError e) {
-            outOfMemoryListener.accept(e);
+            consumeOOMError(e);
             throw e;
         }
     }
@@ -166,12 +176,12 @@ public class ByteBufAllocatorImpl extends 
AbstractByteBufAllocator implements By
                     try {
                         return unpooledAllocator.heapBuffer(initialCapacity, 
maxCapacity);
                     } catch (OutOfMemoryError e2) {
-                        outOfMemoryListener.accept(e2);
+                        consumeOOMError(e2);
                         throw e2;
                     }
                 } else {
                     // ThrowException
-                    outOfMemoryListener.accept(e);
+                    consumeOOMError(e);
                     throw e;
                 }
             }
@@ -181,12 +191,24 @@ public class ByteBufAllocatorImpl extends 
AbstractByteBufAllocator implements By
             try {
                 return unpooledAllocator.directBuffer(initialCapacity, 
maxCapacity);
             } catch (OutOfMemoryError e) {
-                outOfMemoryListener.accept(e);
-                throw e;
+                consumeOOMError(e);
+            throw e;
             }
         }
     }
 
+    private void consumeOOMError(OutOfMemoryError outOfMemoryError) {
+        try {
+            outOfMemoryListener.accept(outOfMemoryError);
+        } catch (Throwable e) {
+            log.warn("Consume outOfMemory error failed.", e);
+        }
+        if (exitOnOutOfMemory) {
+            log.info("Exiting JVM process for OOM error: {}", 
outOfMemoryError.getMessage(), outOfMemoryError);
+            ShutdownUtil.triggerImmediateForcefulShutdown();
+        }
+    }
+
     @Override
     public boolean isDirectBufferPooled() {
         return pooledAllocator != null && 
pooledAllocator.isDirectBufferPooled();
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java
new file mode 100644
index 0000000000..a398b57fe7
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Forked from <a href="https://github.com/apache/pulsar";>Pulsar</a>.
+ */
+@Slf4j
+public class ShutdownUtil {
+    private static final Method log4j2ShutdownMethod;
+
+    static {
+        // use reflection to find org.apache.logging.log4j.LogManager.shutdown 
method
+        Method shutdownMethod = null;
+        try {
+            shutdownMethod = 
Class.forName("org.apache.logging.log4j.LogManager")
+                    .getMethod("shutdown");
+        } catch (ClassNotFoundException | NoSuchMethodException e) {
+            // ignore when Log4j2 isn't found, log at debug level
+            log.debug("Cannot find 
org.apache.logging.log4j.LogManager.shutdown method", e);
+        }
+        log4j2ShutdownMethod = shutdownMethod;
+    }
+
+    /**
+     * Triggers an immediate forceful shutdown of the current process.
+     *
+     * @param status Termination status. By convention, a nonzero status code 
indicates abnormal termination.
+     * @see Runtime#halt(int)
+     */
+    public static void triggerImmediateForcefulShutdown(int status) {
+        triggerImmediateForcefulShutdown(status, true);
+    }
+    public static void triggerImmediateForcefulShutdown(int status, boolean 
logging) {
+        try {
+            if (status != 0 && logging) {
+                log.warn("Triggering immediate shutdown of current process 
with status {}", status,
+                        new Exception("Stacktrace for immediate shutdown"));
+            }
+            shutdownLogging();
+        } finally {
+            Runtime.getRuntime().halt(status);
+        }
+    }
+
+    private static void shutdownLogging() {
+        // flush log buffers and shutdown log4j2 logging to prevent log 
truncation
+        if (log4j2ShutdownMethod != null) {
+            try {
+                // use reflection to call 
org.apache.logging.log4j.LogManager.shutdown()
+                log4j2ShutdownMethod.invoke(null);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+                log.error("Unable to call 
org.apache.logging.log4j.LogManager.shutdown using reflection.", e);
+            }
+        }
+    }
+
+    /**
+     * Triggers an immediate forceful shutdown of the current process using 1 
as the status code.
+     *
+     * @see Runtime#halt(int)
+     */
+    public static void triggerImmediateForcefulShutdown() {
+        triggerImmediateForcefulShutdown(1);
+    }
+}
\ No newline at end of file
diff --git 
a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java
 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java
new file mode 100644
index 0000000000..55031dd8f8
--- /dev/null
+++ 
b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * defines the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.util;
\ No newline at end of file
diff --git 
a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
 
b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
index 6f2538d6c8..40c41fa65b 100644
--- 
a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
+++ 
b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 import io.netty.buffer.ByteBuf;
@@ -35,7 +36,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
 import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.common.util.ShutdownUtil;
 import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 /**
  * Tests for {@link ByteBufAllocatorBuilderImpl}.
@@ -87,6 +91,30 @@ public class ByteBufAllocatorBuilderTest {
         assertEquals(outOfDirectMemException, receivedException.get());
     }
 
+    @Test()
+    public void testOomExit() {
+        ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+        when(baseAlloc.directBuffer(anyInt(), 
anyInt())).thenThrow(outOfDirectMemException);
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .pooledAllocator(baseAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException)
+                .exitOnOutOfMemory(true)
+                .build();
+
+        MockedStatic<ShutdownUtil> mockedStatic = 
mockStatic(ShutdownUtil.class);
+
+        try {
+            alloc.buffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(outOfDirectMemException, e);
+        }
+
+        mockedStatic.verify(() -> 
ShutdownUtil.triggerImmediateForcefulShutdown(), Mockito.times(1));
+    }
+
     @Test
     public void testOomWithFallback() {
         ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
index c9b71b9968..755efd5be0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
@@ -70,6 +70,7 @@ public class BookieResources {
             .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
             .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
             .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+            .exitOnOutOfMemory(conf.exitOnOutOfMemory())
             .build();
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index d7043dc8c9..0362aadcba 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -478,6 +478,7 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
                     .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
                     .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
                     
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+                    .exitOnOutOfMemory(conf.exitOnOutOfMemory())
                     .build();
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 438dc40983..6d2a82bb55 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -183,6 +183,7 @@ public abstract class AbstractConfiguration<T extends 
AbstractConfiguration>
     protected static final String ALLOCATOR_POOLING_CONCURRENCY = 
"allocatorPoolingConcurrency";
     protected static final String ALLOCATOR_OOM_POLICY = 
"allocatorOutOfMemoryPolicy";
     protected static final String ALLOCATOR_LEAK_DETECTION_POLICY = 
"allocatorLeakDetectionPolicy";
+    protected static final String ALLOCATOR_EXIT_ON_OUT_OF_MEMORY = 
"allocatorExitOnOutOfMemory";
 
     // option to limit stats logging
     public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";
@@ -1156,6 +1157,15 @@ public abstract class AbstractConfiguration<T extends 
AbstractConfiguration>
         return getThis();
     }
 
+    public T setExitOnOutOfMemory(boolean exitOnOutOfMemory) {
+        this.setProperty(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, exitOnOutOfMemory);
+        return getThis();
+    }
+
+    public boolean exitOnOutOfMemory() {
+        return getBoolean(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, false);
+    }
+
     /**
      * Return whether the busy-wait is enabled for BookKeeper and Netty IO 
threads.
      *
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
index a6333a47d3..194ab2c68d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
@@ -19,6 +19,8 @@
 package org.apache.bookkeeper.conf;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.mock;
 
@@ -179,4 +181,12 @@ public class AbstractConfigurationTest {
             System.getProperties().put(nettyLevelKey, nettyLevelStr);
         }
     }
+
+    @Test
+    public void testExitOnOutOfMemory() {
+        assertFalse(conf.exitOnOutOfMemory());
+        conf.setExitOnOutOfMemory(true);
+        assertTrue(conf.exitOnOutOfMemory());
+    }
+
 }
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
index 383ccfb982..e287fbd94a 100644
--- 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
@@ -495,6 +495,7 @@ public class JournalWriter implements Runnable {
                     log.error("Unable to allocate memory, exiting bookie", ex);
                 })
                 .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+                .exitOnOutOfMemory(conf.exitOnOutOfMemory())
                 .build();
     }
 

Reply via email to