This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7b185a256 [CELEBORN-1058] Support specifying the number of dispatcher
threads for each role
7b185a256 is described below
commit 7b185a2562857b35ae4fd6bed7026ebd616eeda9
Author: onebox-li <[email protected]>
AuthorDate: Fri Nov 3 10:35:54 2023 +0800
[CELEBORN-1058] Support specifying the number of dispatcher threads for
each role
### What changes were proposed in this pull request?
Support specifying the number of dispatcher threads for each role,
especially shuffle client side. For shuffle client, there is only
RpcEndpointVerifier endpoint which handles not many requests, one thread is
enough. The rpc env of other roles has only two endpoints at most, using a
shared event loop is reasonable. I am not sure if there is a need to add rpc
requests to shuffle client. So add specific parameters to specify the
dispatcher threads here.
And change the dispatcher thread pool name in order to distinguish it from
spark's.
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
Yes, add params celeborn.\<role>.rpc.dispatcher.threads
### How was this patch tested?
Manual test and UT
Closes #2003 from onebox-li/my_dev.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/client/ShuffleClientImpl.java | 4 +--
.../celeborn/common/protocol/RpcNameConstants.java | 13 ++++----
.../org/apache/celeborn/common/CelebornConf.scala | 25 ++++++++++++---
.../org/apache/celeborn/common/rpc/RpcEnv.scala | 4 +--
.../celeborn/common/rpc/netty/Dispatcher.scala | 14 ++++----
.../celeborn/common/rpc/netty/NettyRpcEnv.scala | 37 ++++++++++------------
.../apache/celeborn/common/CelebornConfSuite.scala | 23 ++++++++++++++
docs/configuration/network.md | 3 +-
8 files changed, 79 insertions(+), 44 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d913a9e06..39d382d10 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -172,8 +172,8 @@ public class ShuffleClientImpl extends ShuffleClient {
pushDataTimeout = conf.pushDataTimeoutMs();
}
- // init rpc env and master endpointRef
- rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(conf), 0,
conf);
+ // init rpc env
+ rpcEnv = RpcEnv.create(RpcNameConstants.SHUFFLE_CLIENT_SYS,
Utils.localHostName(conf), 0, conf);
String module = TransportModuleConstants.DATA_MODULE;
TransportConf dataTransportConf =
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
index 17d4fd2c8..c47f04f98 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java
@@ -19,18 +19,19 @@ package org.apache.celeborn.common.protocol;
public class RpcNameConstants {
// For Master
- public static String MASTER_SYS = "MasterSys";
-
+ public static String MASTER_SYS = "Master";
// Master Endpoint Name
public static String MASTER_EP = "MasterEndpoint";
// For Worker
- public static String WORKER_SYS = "WorkerSys";
-
+ public static String WORKER_SYS = "Worker";
// Worker Endpoint Name
public static String WORKER_EP = "WorkerEndpoint";
- // For Driver(SparkShuffleManager)
+ // For LifecycleManager
+ public static String LIFECYCLE_MANAGER_SYS = "LifecycleManager";
public static String LIFECYCLE_MANAGER_EP = "LifecycleManagerEndpoint";
- public static String LIFECYCLE_MANAGER_SYS = "LifecycleManagerSys";
+
+ // For Shuffle Client
+ public static String SHUFFLE_CLIENT_SYS = "ShuffleClient";
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 24b65b022..0879a6107 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -381,8 +381,17 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
def rpcAskTimeout: RpcTimeout =
new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key)
- def rpcDispatcherNumThreads(availableCores: Int): Int =
- get(RPC_DISPATCHER_THREADS).getOrElse(availableCores)
+ def rpcDispatcherNumThreads(availableCores: Int): Int = {
+ val num = get(RPC_DISPATCHER_THREADS)
+ if (num != 0) num else availableCores
+ }
+ def rpcDispatcherNumThreads(availableCores: Int, role: String): Int = {
+ val num = getInt(
+ RPC_ROLE_DISPATHER_THREADS.key.replace("<role>", role),
+ rpcDispatcherNumThreads(availableCores))
+ if (num != 0) num else availableCores
+ }
+
def networkIoMode(module: String): String = {
val key = NETWORK_IO_MODE.key.replace("<module>", module)
get(key, NETWORK_IO_MODE.defaultValue.get)
@@ -1346,14 +1355,20 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
- val RPC_DISPATCHER_THREADS: OptionalConfigEntry[Int] =
+ val RPC_DISPATCHER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.rpc.dispatcher.threads")
.withAlternative("celeborn.rpc.dispatcher.numThreads")
.categories("network")
- .doc("Threads number of message dispatcher event loop")
+ .doc("Threads number of message dispatcher event loop. Default to 0,
which is availableCore.")
.version("0.3.0")
.intConf
- .createOptional
+ .createWithDefault(0)
+
+ val RPC_ROLE_DISPATHER_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.<role>.rpc.dispatcher.threads")
+ .categories("network")
+ .doc("Threads number of message dispatcher event loop for roles")
+ .fallbackConf(RPC_DISPATCHER_THREADS)
val NETWORK_IO_MODE: ConfigEntry[String] =
buildConf("celeborn.<module>.io.mode")
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 4d3d6d0d2..f0d0f0452 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -59,9 +59,9 @@ object RpcEnv {
*
* [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given
name or uri.
*/
-abstract class RpcEnv(conf: CelebornConf) {
+abstract class RpcEnv(config: RpcEnvConfig) {
- private[celeborn] val defaultLookupTimeout = conf.rpcLookupTimeout
+ private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout
/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to
implement
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
index 9ea65aef3..b137ae2f5 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
@@ -32,11 +32,8 @@ import org.apache.celeborn.common.util.{JavaUtils,
ThreadUtils}
/**
* A message dispatcher, responsible for routing RPC messages to the
appropriate endpoint(s).
- *
- * @param numUsableCores Number of CPU cores allocated to the process, for
sizing the thread pool.
- * If 0, will consider the available CPUs on the host.
*/
-private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int)
extends Logging {
+private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
private class EndpointData(
val name: String,
@@ -62,7 +59,7 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
numUsableCores: Int) e
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint):
NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
- val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
+ val endpointRef = new NettyRpcEndpointRef(nettyEnv.celebornConf, addr,
nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
@@ -200,11 +197,14 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
numUsableCores: Int) e
/** Thread pool used for dispatching messages. */
private val threadpool: ThreadPoolExecutor = {
+ val numUsableCores = nettyEnv.config.numUsableCores
val availableCores =
if (numUsableCores > 0) numUsableCores
else Math.max(16, Runtime.getRuntime.availableProcessors())
- val numThreads = nettyEnv.conf.rpcDispatcherNumThreads(availableCores)
- val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads,
"dispatcher-event-loop")
+ val role = nettyEnv.config.name.toLowerCase()
+ val numThreads =
nettyEnv.celebornConf.rpcDispatcherNumThreads(availableCores, role)
+
+ val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads,
"celeborn-dispatcher")
logInfo(s"Dispatcher numThreads: $numThreads")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 31809e366..2d467cc9b 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -44,17 +44,17 @@ import
org.apache.celeborn.common.serializer.{JavaSerializer, JavaSerializerInst
import org.apache.celeborn.common.util.{ByteBufferInputStream,
ByteBufferOutputStream, JavaUtils, ThreadUtils, Utils}
class NettyRpcEnv(
- val conf: CelebornConf,
- javaSerializerInstance: JavaSerializerInstance,
- host: String,
- numUsableCores: Int) extends RpcEnv(conf) with Logging {
+ val config: RpcEnvConfig,
+ javaSerializerInstance: JavaSerializerInstance) extends RpcEnv(config)
with Logging {
+
+ val celebornConf = config.conf
private[celeborn] val transportConf = Utils.fromCelebornConf(
- conf.clone,
+ celebornConf.clone,
TransportModuleConstants.RPC_MODULE,
- conf.rpcIoThreads.getOrElse(numUsableCores))
+ celebornConf.rpcIoThreads.getOrElse(config.numUsableCores))
- private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
+ private val dispatcher: Dispatcher = new Dispatcher(this)
private var worker: RpcEndpoint = null
@@ -70,7 +70,7 @@ class NettyRpcEnv(
// to implement non-blocking send/ask.
private[celeborn] val clientConnectionExecutor =
ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
- conf.rpcConnectThreads)
+ celebornConf.rpcConnectThreads)
@volatile private var server: TransportServer = _
@@ -101,7 +101,7 @@ class NettyRpcEnv(
@Nullable
override lazy val address: RpcAddress = {
- if (server != null) RpcAddress(host, server.getPort()) else null
+ if (server != null) RpcAddress(config.advertiseAddress, server.getPort())
else null
}
override def setupEndpoint(name: String, endpoint: RpcEndpoint):
RpcEndpointRef = {
@@ -121,12 +121,12 @@ class NettyRpcEnv(
def asyncSetupEndpointRefByAddr(addr: RpcEndpointAddress):
Future[RpcEndpointRef] = {
val verifier = new NettyRpcEndpointRef(
- conf,
+ celebornConf,
RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME),
this)
verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(addr.name)).flatMap {
find =>
if (find) {
- Future.successful(new NettyRpcEndpointRef(conf, addr, this))
+ Future.successful(new NettyRpcEndpointRef(celebornConf, addr, this))
} else {
Future.failed(new RpcEndpointNotFoundException(addr.toString))
}
@@ -341,17 +341,12 @@ private[celeborn] object NettyRpcEnv extends Logging {
private[celeborn] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
- val conf = config.conf
+ val celebornConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we
plan to support
// KryoSerializer in future, we have to use ThreadLocal to store
SerializerInstance
val javaSerializerInstance =
- new
JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance]
- val nettyEnv =
- new NettyRpcEnv(
- conf,
- javaSerializerInstance,
- config.advertiseAddress,
- config.numUsableCores)
+ new
JavaSerializer(celebornConf).newInstance().asInstanceOf[JavaSerializerInstance]
+ val nettyEnv = new NettyRpcEnv(config, javaSerializerInstance)
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
logInfo(s"Starting RPC Server [${config.name}] on
${config.bindAddress}:$actualPort " +
s"with advisor endpoint ${config.advertiseAddress}:$actualPort")
@@ -359,7 +354,7 @@ private[celeborn] class NettyRpcEnvFactory extends
RpcEnvFactory with Logging {
(nettyEnv, nettyEnv.address.port)
}
try {
- Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf,
config.name)._1
+ Utils.startServiceOnPort(config.port, startNettyRpcEnv, celebornConf,
config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
@@ -495,7 +490,7 @@ private[celeborn] object RequestMessage {
try {
val senderAddress = readRpcAddress(in)
val endpointAddress = RpcEndpointAddress(readRpcAddress(in),
in.readUTF())
- val ref = new NettyRpcEndpointRef(nettyEnv.conf, endpointAddress,
nettyEnv)
+ val ref = new NettyRpcEndpointRef(nettyEnv.config.conf, endpointAddress,
nettyEnv)
ref.client = client
new RequestMessage(
senderAddress,
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index f57ce9040..f3e01c5ab 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -226,4 +226,27 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set("celeborn.storage.activeTypes", "SDD,HDD")
assert(conf.workerCommitThreads === 32)
}
+
+ test("Test role rpcDispatcherNumThreads") {
+ val availableCores = 5
+ val conf = new CelebornConf()
+ assert(conf.rpcDispatcherNumThreads(availableCores, "shuffleclient") === 5)
+
+ conf.set("celeborn.shuffleclient.rpc.dispatcher.threads", "1")
+ assert(conf.rpcDispatcherNumThreads(availableCores, "shuffleclient") === 1)
+ assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager")
=== 5)
+
+ conf.set("celeborn.rpc.dispatcher.threads", "2")
+ assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager")
=== 2)
+
+ conf.unset("celeborn.rpc.dispatcher.threads")
+ conf.set("celeborn.rpc.dispatcher.numThreads", "3")
+ assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager")
=== 3)
+ }
+
+ test("Test rpcDispatcherNumThreads") {
+ val availableCores = 5
+ val conf = new CelebornConf()
+ assert(conf.rpcDispatcherNumThreads(availableCores) === 5)
+ }
}
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 225c9de7b..f63fc8bf7 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -38,6 +38,7 @@ license: |
| celeborn.<module>.io.serverThreads | 0 | Number of threads used in the
server thread pool. Default to 0, which is 2x#cores. | |
| celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for
checking push data timeout. If setting <module> to `data`, it works for shuffle
client push data and should be configured on client side. If setting <module>
to `replicate`, it works for worker replicate data to peer worker and should be
configured on worker side. | 0.3.0 |
| celeborn.<module>.push.timeoutCheck.threads | 4 | Threads num for
checking push data timeout. If setting <module> to `data`, it works for shuffle
client push data and should be configured on client side. If setting <module>
to `replicate`, it works for worker replicate data to peer worker and should be
configured on worker side. | 0.3.0 |
+| celeborn.<role>.rpc.dispatcher.threads | <value of
celeborn.rpc.dispatcher.threads> | Threads number of message dispatcher
event loop for roles | |
| celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP
address, otherwise FQDN. This configuration only takes effects when the bind
hostname is not set explicitly, in such case, Celeborn will find the first
non-loopback address to bind. | 0.3.0 |
| celeborn.network.connect.timeout | 10s | Default socket connect timeout. |
0.2.0 |
| celeborn.network.memory.allocator.numArenas | <undefined> | Number of
arenas for pooled memory allocator. Default value is
Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 |
@@ -46,7 +47,7 @@ license: |
| celeborn.port.maxRetries | 1 | When port is occupied, we will retry for max
retry times. | 0.2.0 |
| celeborn.rpc.askTimeout | 60s | Timeout for RPC ask operations. It's
recommended to set at least `240s` when `HDFS` is enabled in
`celeborn.storage.activeTypes` | 0.2.0 |
| celeborn.rpc.connect.threads | 64 | | 0.2.0 |
-| celeborn.rpc.dispatcher.threads | <undefined> | Threads number of
message dispatcher event loop | 0.3.0 |
+| celeborn.rpc.dispatcher.threads | 0 | Threads number of message dispatcher
event loop. Default to 0, which is availableCore. | 0.3.0 |
| celeborn.rpc.io.threads | <undefined> | Netty IO thread number of
NettyRpcEnv to handle RPC request. The default threads number is the number of
runtime available processors. | 0.2.0 |
| celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. |
0.2.0 |
| celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | The max
number of chunks allowed to be transferred at the same time on shuffle service.
Note that new incoming connections will be closed when the max number is hit.
The client will retry according to the shuffle retry configs (see
`celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if
those limits are reached the task will fail with fetch failure. | 0.2.0 |