This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a3dd067939bec898a013b60cc974a9a8a57d00cc
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Nov 21 10:13:23 2024 +0200

    [improve] Improve logic for enabling Netty leak detection (#23613)
    
    (cherry picked from commit 949750fc080f3bee964dc839d16c9e215465d93a)
---
 .../common/allocator/PulsarByteBufAllocator.java   | 29 ++++++++-
 .../allocator/PulsarByteBufAllocatorTest.java      | 76 ++++++++++++++++++++++
 2 files changed, 103 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
index ac12bb2df12..4ad0732a62d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.common.allocator;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
@@ -44,6 +48,13 @@ public class PulsarByteBufAllocator {
     public static final String PULSAR_ALLOCATOR_LEAK_DETECTION = 
"pulsar.allocator.leak_detection";
     public static final String PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY = 
"pulsar.allocator.out_of_memory_policy";
 
+    // the highest level of leak detection policy will be used when it is set 
by any of the following property names
+    private static final String[] LEAK_DETECTION_PROPERTY_NAMES = {
+            PULSAR_ALLOCATOR_LEAK_DETECTION,
+            "io.netty.leakDetection.level", // 
io.netty.util.ResourceLeakDetector.PROP_LEVEL
+            "io.netty.leakDetectionLevel" // 
io.netty.util.ResourceLeakDetector.PROP_LEVEL_OLD
+    };
+
     public static final ByteBufAllocator DEFAULT;
 
     private static final List<Consumer<OutOfMemoryError>> LISTENERS = new 
CopyOnWriteArrayList<>();
@@ -64,8 +75,7 @@ public class PulsarByteBufAllocator {
         final OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.valueOf(
                 System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, 
"FallbackToHeap"));
 
-        final LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy
-                .valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, 
"Disabled"));
+        final LeakDetectionPolicy leakDetectionPolicy = 
resolveLeakDetectionPolicyWithHighestLevel(System::getProperty);
         if (log.isDebugEnabled()) {
             log.debug("Is Pooled: {} -- Exit on OOM: {}", isPooled, 
isExitOnOutOfMemory);
         }
@@ -98,4 +108,19 @@ public class PulsarByteBufAllocator {
         return builder.build();
 
     }
+
+    /**
+     * Resolve the leak detection policy. The value is resolved from the 
system properties in
+     * the order of LEAK_DETECTION_PROPERTY_NAMES.
+     * @return parsed leak detection policy
+     */
+    @VisibleForTesting
+    static LeakDetectionPolicy 
resolveLeakDetectionPolicyWithHighestLevel(Function<String, String> 
propertyResolver) {
+        return Arrays.stream(LEAK_DETECTION_PROPERTY_NAMES)
+                .map(propertyResolver)
+                .filter(Objects::nonNull)
+                .map(LeakDetectionPolicy::parseLevel)
+                .max(Comparator.comparingInt(Enum::ordinal))
+                .orElse(LeakDetectionPolicy.Disabled);
+    }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorTest.java
new file mode 100644
index 00000000000..4c69a1938b9
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.allocator;
+
+import static org.testng.Assert.assertEquals;
+import java.util.Properties;
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.testng.annotations.Test;
+
+public class PulsarByteBufAllocatorTest {
+
+    @Test
+    public void testResolveLeakDetectionPolicyWithHighestLevel() {
+        Properties properties = new Properties();
+        properties.setProperty("io.netty.leakDetectionLevel", "paranoid");
+        properties.setProperty("io.netty.leakDetection.level", "advanced");
+        properties.setProperty("pulsar.allocator.leak_detection", "simple");
+        
assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty),
+                LeakDetectionPolicy.Paranoid);
+
+        properties.setProperty("io.netty.leakDetectionLevel", "advanced");
+        properties.setProperty("io.netty.leakDetection.level", "simple");
+        properties.setProperty("pulsar.allocator.leak_detection", "paranoid");
+        
assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty),
+                LeakDetectionPolicy.Paranoid);
+
+        properties.setProperty("io.netty.leakDetectionLevel", "simple");
+        properties.setProperty("io.netty.leakDetection.level", "paranoid");
+        properties.setProperty("pulsar.allocator.leak_detection", "advanced");
+        
assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty),
+                LeakDetectionPolicy.Paranoid);
+
+        properties.setProperty("io.netty.leakDetectionLevel", "disabled");
+        properties.setProperty("io.netty.leakDetection.level", "simple");
+        properties.setProperty("pulsar.allocator.leak_detection", "disabled");
+        
assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty),
+                LeakDetectionPolicy.Simple);
+
+        properties.setProperty("io.netty.leakDetectionLevel", "invalid");
+        properties.setProperty("io.netty.leakDetection.level", "invalid");
+        properties.setProperty("pulsar.allocator.leak_detection", "invalid");
+        
assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty),
+                LeakDetectionPolicy.Disabled);
+
+        properties.clear();
+        properties.setProperty("pulsar.allocator.leak_detection", "Paranoid");
+        
assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty),
+                LeakDetectionPolicy.Paranoid);
+
+        properties.clear();
+        properties.setProperty("io.netty.leakDetectionLevel", "Advanced");
+        
assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty),
+                LeakDetectionPolicy.Advanced);
+
+        properties.clear();
+        properties.setProperty("io.netty.leakDetection.level", "Simple");
+        
assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty),
+                LeakDetectionPolicy.Simple);
+    }
+}
\ No newline at end of file

Reply via email to