This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 400e9518d [CELEBORN-2280] Support 
celeborn.network.memory.allocator.type to specify netty memory allocator
400e9518d is described below

commit 400e9518d7f19cacf8797971e169afa6bdea5be6
Author: SteNicholas <[email protected]>
AuthorDate: Sat Mar 14 17:27:00 2026 +0800

    [CELEBORN-2280] Support celeborn.network.memory.allocator.type to specify 
netty memory allocator
    
    ### What changes were proposed in this pull request?
    
    Support `celeborn.network.memory.allocator.type` to specify netty memory 
allocator including `AdaptiveByteBufAllocator `.
    
    ### Why are the changes needed?
    
    Netty 4.2 introduces `AdaptiveByteBufAllocator` an auto-tuning pooling 
`ByteBufAllocator` which uses `AdaptivePoolingAllocator` added in 
https://github.com/netty/netty/pull/13075.
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Introduce `celeborn.network.memory.allocator.type` to specify netty memory 
allocator.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3625 from SteNicholas/CELEBORN-2280.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../network/util/NettyMemoryAllocatorType.java     | 33 ++++++++++++
 .../celeborn/common/network/util/NettyUtils.java   | 61 ++++++++++++----------
 .../org/apache/celeborn/common/CelebornConf.scala  | 28 ++++++----
 docs/migration.md                                  |  2 +
 4 files changed, 86 insertions(+), 38 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryAllocatorType.java
 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryAllocatorType.java
new file mode 100644
index 000000000..1aea6efbc
--- /dev/null
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryAllocatorType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.util;
+
+import io.netty.buffer.AdaptiveByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+/** Netty memory allocator type of {@link ByteBufAllocator}. */
+public enum NettyMemoryAllocatorType {
+  /** A pooled {@link ByteBufAllocator}: {@link PooledByteBufAllocator}. */
+  POOLED,
+  /** A un-pooled {@link ByteBufAllocator}: {@link UnpooledByteBufAllocator}. 
*/
+  UNPOOLED,
+  /** An auto-tuning pooling {@link ByteBufAllocator}: {@link 
AdaptiveByteBufAllocator}. */
+  ADAPTIVE
+}
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
index 109328233..df9d507a2 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
 
+import io.netty.buffer.AdaptiveByteBufAllocator;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
@@ -132,7 +133,7 @@ public class NettyUtils {
   /**
    * Create a ByteBufAllocator that respects the parameters
    *
-   * @param pooled If true, create a PooledByteBufAllocator, otherwise 
UnpooledByteBufAllocator
+   * @param allocatorType netty memory allocator type.
    * @param allowDirectBufs If true and platform supports, allocate ByteBuf in 
direct memory,
    *     otherwise in heap memory.
    * @param allowCache If true, enable thread-local cache, it only take effect 
for
@@ -141,23 +142,32 @@ public class NettyUtils {
    *     effect for PooledByteBufAllocator.
    */
   private static ByteBufAllocator createByteBufAllocator(
-      boolean pooled, boolean allowDirectBufs, boolean allowCache, int 
numCores) {
-    if (pooled) {
-      if (numCores == 0) {
-        numCores = Runtime.getRuntime().availableProcessors();
-      }
-      return new PooledByteBufAllocator(
-          allowDirectBufs && PlatformDependent.directBufferPreferred(),
-          Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
-          Math.min(PooledByteBufAllocator.defaultNumDirectArena(), 
allowDirectBufs ? numCores : 0),
-          PooledByteBufAllocator.defaultPageSize(),
-          PooledByteBufAllocator.defaultMaxOrder(),
-          allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
-          allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
-          allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
-    } else {
-      return new UnpooledByteBufAllocator(
-          allowDirectBufs && PlatformDependent.directBufferPreferred());
+      NettyMemoryAllocatorType allocatorType,
+      boolean allowDirectBufs,
+      boolean allowCache,
+      int numCores) {
+    boolean preferDirect = allowDirectBufs && 
PlatformDependent.directBufferPreferred();
+    switch (allocatorType) {
+      case POOLED:
+        if (numCores == 0) {
+          numCores = Runtime.getRuntime().availableProcessors();
+        }
+        return new PooledByteBufAllocator(
+            preferDirect,
+            Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
+            Math.min(
+                PooledByteBufAllocator.defaultNumDirectArena(), 
allowDirectBufs ? numCores : 0),
+            PooledByteBufAllocator.defaultPageSize(),
+            PooledByteBufAllocator.defaultMaxOrder(),
+            allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
+            allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
+            allowCache && 
PooledByteBufAllocator.defaultUseCacheForAllThreads());
+      case UNPOOLED:
+        return new UnpooledByteBufAllocator(preferDirect);
+      case ADAPTIVE:
+        return new AdaptiveByteBufAllocator(preferDirect);
+      default:
+        throw new IllegalArgumentException("Unknown allocator type: " + 
allocatorType);
     }
   }
 
@@ -169,10 +179,10 @@ public class NettyUtils {
       CelebornConf conf, AbstractSource source, boolean allowCache) {
     final int index = allowCache ? 0 : 1;
     if (_sharedByteBufAllocator[index] == null) {
+      NettyMemoryAllocatorType allocatorType = 
conf.networkMemoryAllocatorType();
       _sharedByteBufAllocator[index] =
-          createByteBufAllocator(
-              conf.networkMemoryAllocatorPooled(), true, allowCache, 
conf.networkAllocatorArenas());
-      if (conf.networkMemoryAllocatorPooled()) {
+          createByteBufAllocator(allocatorType, true, allowCache, 
conf.networkAllocatorArenas());
+      if (allocatorType == NettyMemoryAllocatorType.POOLED) {
         pooledByteBufAllocators.add((PooledByteBufAllocator) 
_sharedByteBufAllocator[index]);
       }
       if (source != null) {
@@ -206,13 +216,10 @@ public class NettyUtils {
     } else {
       arenas = conf.getCelebornConf().networkAllocatorArenas();
     }
+    NettyMemoryAllocatorType allocatorType = 
conf.getCelebornConf().networkMemoryAllocatorType();
     ByteBufAllocator allocator =
-        createByteBufAllocator(
-            conf.getCelebornConf().networkMemoryAllocatorPooled(),
-            conf.preferDirectBufs(),
-            allowCache,
-            arenas);
-    if (conf.getCelebornConf().networkMemoryAllocatorPooled()) {
+        createByteBufAllocator(allocatorType, conf.preferDirectBufs(), 
allowCache, arenas);
+    if (allocatorType == NettyMemoryAllocatorType.POOLED) {
       pooledByteBufAllocators.add((PooledByteBufAllocator) allocator);
     }
     if (source != null) {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 6ec87a7c4..b1a4a469e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -36,7 +36,7 @@ import 
org.apache.celeborn.common.client.{ApplicationInfoProvider, DefaultApplic
 import org.apache.celeborn.common.identity.{DefaultIdentityProvider, 
HadoopBasedIdentityProvider, IdentityProvider}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.internal.config._
-import org.apache.celeborn.common.network.util.{ByteUnit, IOMode}
+import org.apache.celeborn.common.network.util.{ByteUnit, IOMode, 
NettyMemoryAllocatorType}
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.protocol.StorageInfo.Type
 import org.apache.celeborn.common.protocol.StorageInfo.Type.{HDD, SSD}
@@ -616,8 +616,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def networkMemoryAllocatorAllowCache: Boolean =
     get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE)
 
-  def networkMemoryAllocatorPooled: Boolean =
-    get(NETWORK_MEMORY_ALLOCATOR_POOLED)
+  def networkMemoryAllocatorType: NettyMemoryAllocatorType =
+    NettyMemoryAllocatorType.valueOf(get(NETWORK_MEMORY_ALLOCATOR_TYPE))
 
   def networkAllocatorArenas: Int = 
get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max(
     Runtime.getRuntime.availableProcessors(),
@@ -1949,16 +1949,22 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
-  val NETWORK_MEMORY_ALLOCATOR_POOLED: ConfigEntry[Boolean] =
-    buildConf("celeborn.network.memory.allocator.pooled")
+  val NETWORK_MEMORY_ALLOCATOR_TYPE: ConfigEntry[String] =
+    buildConf("celeborn.network.memory.allocator.type")
       .categories("network")
       .internal
-      .version("0.6.0")
-      .doc("If disabled, always use UnpooledByteBufAllocator for aggressive 
memory reclamation, " +
-        "this is helpful for cases that worker has high memory usage even 
after triming. " +
-        "Disabling would cause performace degression and higher CPU usage.")
-      .booleanConf
-      .createWithDefault(true)
+      .version("0.7.0")
+      .doc("Specifies netty memory allocator type including: " +
+        s"${NettyMemoryAllocatorType.POOLED.name}: use PooledByteBufAllocator, 
which is the default and recommended for better performance. " +
+        s"${NettyMemoryAllocatorType.UNPOOLED.name}: use 
UnpooledByteBufAllocator, which is more aggressive in memory reclamation and 
may cause performance degradation and higher CPU usage. " +
+        s"${NettyMemoryAllocatorType.ADAPTIVE.name}: use 
AdaptiveByteBufAllocator, which is recommended to roll out usage slowly, and to 
carefully monitor application performance in the process.")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(Set(
+        NettyMemoryAllocatorType.POOLED.name,
+        NettyMemoryAllocatorType.UNPOOLED.name,
+        NettyMemoryAllocatorType.ADAPTIVE.name))
+      .createWithDefault(NettyMemoryAllocatorType.POOLED.name)
 
   val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
     buildConf("celeborn.network.memory.allocator.share")
diff --git a/docs/migration.md b/docs/migration.md
index 7e0385f36..810f88b2b 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -33,6 +33,8 @@ license: |
 
 - Since 0.7.0, Celeborn changed the default value of 
`celeborn.worker.directMemoryRatioForReadBuffer` from `0.1` to `0.35`, which 
means read buffer threshold of buffer dispatcher is max direct memory * 0.35 at 
default.
 
+- Since 0.7.0, Celeborn removed `celeborn.network.memory.allocator.pooled`. 
Please use `celeborn.network.memory.allocator.type` instead.
+
 # Upgrading from 0.5 to 0.6
 
 - Since 0.6.0, Celeborn deprecate 
`celeborn.client.spark.fetch.throwsFetchFailure`. Please use 
`celeborn.client.spark.stageRerun.enabled` instead.

Reply via email to