This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e40ad98b00bd98c8733cd8ae9ad43344df79b40b Author: Lari Hotari <[email protected]> AuthorDate: Thu Nov 21 10:13:23 2024 +0200 [improve] Improve logic for enabling Netty leak detection (#23613) (cherry picked from commit 949750fc080f3bee964dc839d16c9e215465d93a) --- .../common/allocator/PulsarByteBufAllocator.java | 29 ++++++++- .../allocator/PulsarByteBufAllocatorTest.java | 76 ++++++++++++++++++++++ 2 files changed, 103 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java index ac12bb2df12..4ad0732a62d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java @@ -21,9 +21,13 @@ package org.apache.pulsar.common.allocator; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; +import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.Function; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder; @@ -44,6 +48,13 @@ public class PulsarByteBufAllocator { public static final String PULSAR_ALLOCATOR_LEAK_DETECTION = "pulsar.allocator.leak_detection"; public static final String PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY = "pulsar.allocator.out_of_memory_policy"; + // the highest level of leak detection policy will be used when it is set by any of the following property names + private static final String[] LEAK_DETECTION_PROPERTY_NAMES = { + PULSAR_ALLOCATOR_LEAK_DETECTION, + "io.netty.leakDetection.level", // io.netty.util.ResourceLeakDetector.PROP_LEVEL + "io.netty.leakDetectionLevel" // io.netty.util.ResourceLeakDetector.PROP_LEVEL_OLD + }; + public static final ByteBufAllocator DEFAULT; private static final List<Consumer<OutOfMemoryError>> LISTENERS = new CopyOnWriteArrayList<>(); @@ -64,8 +75,7 @@ public class PulsarByteBufAllocator { final OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.valueOf( System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, "FallbackToHeap")); - final LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy - .valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, "Disabled")); + final LeakDetectionPolicy leakDetectionPolicy = resolveLeakDetectionPolicyWithHighestLevel(System::getProperty); if (log.isDebugEnabled()) { log.debug("Is Pooled: {} -- Exit on OOM: {}", isPooled, isExitOnOutOfMemory); } @@ -98,4 +108,19 @@ public class PulsarByteBufAllocator { return builder.build(); } + + /** + * Resolve the leak detection policy. The value is resolved from the system properties in + * the order of LEAK_DETECTION_PROPERTY_NAMES. + * @return parsed leak detection policy + */ + @VisibleForTesting + static LeakDetectionPolicy resolveLeakDetectionPolicyWithHighestLevel(Function<String, String> propertyResolver) { + return Arrays.stream(LEAK_DETECTION_PROPERTY_NAMES) + .map(propertyResolver) + .filter(Objects::nonNull) + .map(LeakDetectionPolicy::parseLevel) + .max(Comparator.comparingInt(Enum::ordinal)) + .orElse(LeakDetectionPolicy.Disabled); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorTest.java new file mode 100644 index 00000000000..4c69a1938b9 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.allocator; + +import static org.testng.Assert.assertEquals; +import java.util.Properties; +import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; +import org.testng.annotations.Test; + +public class PulsarByteBufAllocatorTest { + + @Test + public void testResolveLeakDetectionPolicyWithHighestLevel() { + Properties properties = new Properties(); + properties.setProperty("io.netty.leakDetectionLevel", "paranoid"); + properties.setProperty("io.netty.leakDetection.level", "advanced"); + properties.setProperty("pulsar.allocator.leak_detection", "simple"); + assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty), + LeakDetectionPolicy.Paranoid); + + properties.setProperty("io.netty.leakDetectionLevel", "advanced"); + properties.setProperty("io.netty.leakDetection.level", "simple"); + properties.setProperty("pulsar.allocator.leak_detection", "paranoid"); + assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty), + LeakDetectionPolicy.Paranoid); + + properties.setProperty("io.netty.leakDetectionLevel", "simple"); + properties.setProperty("io.netty.leakDetection.level", "paranoid"); + properties.setProperty("pulsar.allocator.leak_detection", "advanced"); + assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty), + LeakDetectionPolicy.Paranoid); + + properties.setProperty("io.netty.leakDetectionLevel", "disabled"); + properties.setProperty("io.netty.leakDetection.level", "simple"); + properties.setProperty("pulsar.allocator.leak_detection", "disabled"); + assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty), + LeakDetectionPolicy.Simple); + + properties.setProperty("io.netty.leakDetectionLevel", "invalid"); + properties.setProperty("io.netty.leakDetection.level", "invalid"); + properties.setProperty("pulsar.allocator.leak_detection", "invalid"); + assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty), + LeakDetectionPolicy.Disabled); + + properties.clear(); + properties.setProperty("pulsar.allocator.leak_detection", "Paranoid"); + assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty), + LeakDetectionPolicy.Paranoid); + + properties.clear(); + properties.setProperty("io.netty.leakDetectionLevel", "Advanced"); + assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty), + LeakDetectionPolicy.Advanced); + + properties.clear(); + properties.setProperty("io.netty.leakDetection.level", "Simple"); + assertEquals(PulsarByteBufAllocator.resolveLeakDetectionPolicyWithHighestLevel(properties::getProperty), + LeakDetectionPolicy.Simple); + } +} \ No newline at end of file
