This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 6fe88ee48 [CELEBORN-1107] Make the max default number of netty threads 
configurable
6fe88ee48 is described below

commit 6fe88ee48eae99fb104a5b2b896bfb129903027a
Author: Chandni Singh <[email protected]>
AuthorDate: Fri Nov 3 13:18:44 2023 +0800

    [CELEBORN-1107] Make the max default number of netty threads configurable
    
    ### What changes were proposed in this pull request?
    This change makes the maximum default number of Netty threads configurable. 
Previously, this value was hardcoded to 64, which could be small for certain 
environments. While it's possible to configure the number of Netty server and 
client threads individually for each module, providing an option to increase 
the default value offers greater convenience.
    
    ### Why are the changes needed?
    The change offers convenience.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a UT
    
    Closes #2065 from otterc/CELEBORN-1107.
    
    Authored-by: Chandni Singh <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit c8b5384baf93f79664664670b797314bb1604d4f)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 10 ++++++++++
 .../main/scala/org/apache/celeborn/common/util/Utils.scala   |  6 +++---
 .../scala/org/apache/celeborn/common/util/UtilsSuite.scala   | 12 +++++++++++-
 docs/configuration/network.md                                |  1 +
 4 files changed, 25 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 05b3a17f1..968386e29 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -507,6 +507,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     getTimeAsMs(key, FETCH_TIMEOUT_CHECK_INTERVAL.defaultValueString)
   }
 
+  def maxDefaultNettyThreads: Int = get(MAX_DEFAULT_NETTY_THREADS)
+
   // //////////////////////////////////////////////////////
   //                      Master                         //
   // //////////////////////////////////////////////////////
@@ -3947,4 +3949,12 @@ object CelebornConf extends Logging {
       .version("0.3.1")
       .booleanConf
       .createWithDefault(false)
+
+  val MAX_DEFAULT_NETTY_THREADS: ConfigEntry[Int] =
+    buildConf("celeborn.io.maxDefaultNettyThreads")
+      .categories("network")
+      .doc("Max default netty threads")
+      .version("0.3.2")
+      .intConf
+      .createWithDefault(64)
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 18956f34b..3c39de638 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -477,14 +477,14 @@ object Utils extends Logging {
     hostPortParseResults.get(hostPort)
   }
 
-  private val MAX_DEFAULT_NETTY_THREADS = 64
+  private var maxDefaultNettyThreads = 64
 
   def fromCelebornConf(
       _conf: CelebornConf,
       module: String,
       numUsableCores: Int = 0): TransportConf = {
     val conf = _conf.clone
-
+    maxDefaultNettyThreads = conf.maxDefaultNettyThreads
     // Specify thread configuration based on our JVM's allocation of cores 
(rather than necessarily
     // assuming we have all the machine's cores).
     // NB: Only set if serverThreads/clientThreads not already set.
@@ -501,7 +501,7 @@ object Utils extends Logging {
   private def defaultNumThreads(numUsableCores: Int): Int = {
     val availableCores =
       if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
+    math.min(availableCores, maxDefaultNettyThreads)
   }
 
   def getClassLoader: ClassLoader = getClass.getClassLoader
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index bce77cc68..d33eeb69c 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -20,7 +20,8 @@ package org.apache.celeborn.common.util
 import java.util
 
 import org.apache.celeborn.CelebornFunSuite
-import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.{PartitionLocation, 
TransportModuleConstants}
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
 MapperEnd}
 import org.apache.celeborn.common.protocol.message.StatusCode
 
@@ -168,6 +169,15 @@ class UtilsSuite extends CelebornFunSuite {
     assert(set.size == 0)
   }
 
+  test("validate number of client/server netty threads") {
+    val celebornConf = new CelebornConf()
+    celebornConf.set("celeborn.io.maxDefaultNettyThreads", "100")
+    val transportConf =
+      Utils.fromCelebornConf(celebornConf, 
TransportModuleConstants.PUSH_MODULE, 120)
+    assert(transportConf.serverThreads() == 100)
+    assert(transportConf.clientThreads() == 100)
+  }
+
   def partitionLocation(partitionId: Int): util.HashSet[PartitionLocation] = {
     val partitionSet = new util.HashSet[PartitionLocation]
     for (i <- 0 until 3) {
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 912073493..d54d9b858 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -39,6 +39,7 @@ license: |
 | celeborn.&lt;module&gt;.push.timeoutCheck.interval | 5s | Interval for 
checking push data timeout. If setting <module> to `data`, it works for shuffle 
client push data and should be configured on client side. If setting <module> 
to `replicate`, it works for worker replicate data to peer worker and should be 
configured on worker side. | 0.3.0 | 
 | celeborn.&lt;module&gt;.push.timeoutCheck.threads | 4 | Threads num for 
checking push data timeout. If setting <module> to `data`, it works for shuffle 
client push data and should be configured on client side. If setting <module> 
to `replicate`, it works for worker replicate data to peer worker and should be 
configured on worker side. | 0.3.0 | 
 | celeborn.&lt;role&gt;.rpc.dispatcher.threads | &lt;value of 
celeborn.rpc.dispatcher.threads&gt; | Threads number of message dispatcher 
event loop for roles |  | 
+| celeborn.io.maxDefaultNettyThreads | 64 | Max default netty threads | 0.3.2 
| 
 | celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP 
address, otherwise FQDN. This configuration only takes effects when the bind 
hostname is not set explicitly, in such case, Celeborn will find the first 
non-loopback address to bind. | 0.3.0 | 
 | celeborn.network.connect.timeout | 10s | Default socket connect timeout. | 
0.2.0 | 
 | celeborn.network.memory.allocator.numArenas | &lt;undefined&gt; | Number of 
arenas for pooled memory allocator. Default value is 
Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 | 

Reply via email to