This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 6fe88ee48 [CELEBORN-1107] Make the max default number of netty threads
configurable
6fe88ee48 is described below
commit 6fe88ee48eae99fb104a5b2b896bfb129903027a
Author: Chandni Singh <[email protected]>
AuthorDate: Fri Nov 3 13:18:44 2023 +0800
[CELEBORN-1107] Make the max default number of netty threads configurable
### What changes were proposed in this pull request?
This change makes the maximum default number of Netty threads configurable.
Previously, this value was hardcoded to 64, which could be small for certain
environments. While it's possible to configure the number of Netty server and
client threads individually for each module, providing an option to increase
the default value offers greater convenience.
### Why are the changes needed?
The change offers convenience.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a UT
Closes #2065 from otterc/CELEBORN-1107.
Authored-by: Chandni Singh <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit c8b5384baf93f79664664670b797314bb1604d4f)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 10 ++++++++++
.../main/scala/org/apache/celeborn/common/util/Utils.scala | 6 +++---
.../scala/org/apache/celeborn/common/util/UtilsSuite.scala | 12 +++++++++++-
docs/configuration/network.md | 1 +
4 files changed, 25 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 05b3a17f1..968386e29 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -507,6 +507,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
getTimeAsMs(key, FETCH_TIMEOUT_CHECK_INTERVAL.defaultValueString)
}
+ def maxDefaultNettyThreads: Int = get(MAX_DEFAULT_NETTY_THREADS)
+
// //////////////////////////////////////////////////////
// Master //
// //////////////////////////////////////////////////////
@@ -3947,4 +3949,12 @@ object CelebornConf extends Logging {
.version("0.3.1")
.booleanConf
.createWithDefault(false)
+
+ val MAX_DEFAULT_NETTY_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.io.maxDefaultNettyThreads")
+ .categories("network")
+ .doc("Max default netty threads")
+ .version("0.3.2")
+ .intConf
+ .createWithDefault(64)
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 18956f34b..3c39de638 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -477,14 +477,14 @@ object Utils extends Logging {
hostPortParseResults.get(hostPort)
}
- private val MAX_DEFAULT_NETTY_THREADS = 64
+ private var maxDefaultNettyThreads = 64
def fromCelebornConf(
_conf: CelebornConf,
module: String,
numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
-
+ maxDefaultNettyThreads = conf.maxDefaultNettyThreads
// Specify thread configuration based on our JVM's allocation of cores
(rather than necessarily
// assuming we have all the machine's cores).
// NB: Only set if serverThreads/clientThreads not already set.
@@ -501,7 +501,7 @@ object Utils extends Logging {
private def defaultNumThreads(numUsableCores: Int): Int = {
val availableCores =
if (numUsableCores > 0) numUsableCores else
Runtime.getRuntime.availableProcessors()
- math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
+ math.min(availableCores, maxDefaultNettyThreads)
}
def getClassLoader: ClassLoader = getClass.getClassLoader
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index bce77cc68..d33eeb69c 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -20,7 +20,8 @@ package org.apache.celeborn.common.util
import java.util
import org.apache.celeborn.CelebornFunSuite
-import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.{PartitionLocation,
TransportModuleConstants}
import
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
MapperEnd}
import org.apache.celeborn.common.protocol.message.StatusCode
@@ -168,6 +169,15 @@ class UtilsSuite extends CelebornFunSuite {
assert(set.size == 0)
}
+ test("validate number of client/server netty threads") {
+ val celebornConf = new CelebornConf()
+ celebornConf.set("celeborn.io.maxDefaultNettyThreads", "100")
+ val transportConf =
+ Utils.fromCelebornConf(celebornConf,
TransportModuleConstants.PUSH_MODULE, 120)
+ assert(transportConf.serverThreads() == 100)
+ assert(transportConf.clientThreads() == 100)
+ }
+
def partitionLocation(partitionId: Int): util.HashSet[PartitionLocation] = {
val partitionSet = new util.HashSet[PartitionLocation]
for (i <- 0 until 3) {
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 912073493..d54d9b858 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -39,6 +39,7 @@ license: |
| celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for
checking push data timeout. If setting <module> to `data`, it works for shuffle
client push data and should be configured on client side. If setting <module>
to `replicate`, it works for worker replicate data to peer worker and should be
configured on worker side. | 0.3.0 |
| celeborn.<module>.push.timeoutCheck.threads | 4 | Threads num for
checking push data timeout. If setting <module> to `data`, it works for shuffle
client push data and should be configured on client side. If setting <module>
to `replicate`, it works for worker replicate data to peer worker and should be
configured on worker side. | 0.3.0 |
| celeborn.<role>.rpc.dispatcher.threads | <value of
celeborn.rpc.dispatcher.threads> | Threads number of message dispatcher
event loop for roles | |
+| celeborn.io.maxDefaultNettyThreads | 64 | Max default netty threads | 0.3.2
|
| celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP
address, otherwise FQDN. This configuration only takes effects when the bind
hostname is not set explicitly, in such case, Celeborn will find the first
non-loopback address to bind. | 0.3.0 |
| celeborn.network.connect.timeout | 10s | Default socket connect timeout. |
0.2.0 |
| celeborn.network.memory.allocator.numArenas | <undefined> | Number of
arenas for pooled memory allocator. Default value is
Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 |