This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2817f7fb9 [CELEBORN-2104] Clean up sources of NettyRpcEnv, Master and
Worker to avoid thread leaks
2817f7fb9 is described below
commit 2817f7fb9ebe08e1b0be46c7e6ab48ee0177142a
Author: dz <[email protected]>
AuthorDate: Fri Aug 29 19:04:19 2025 +0800
[CELEBORN-2104] Clean up sources of NettyRpcEnv, Master and Worker to avoid
thread leaks
### What changes were proposed in this pull request?
Fix NettyRpcEnv,Master,Worker `Source` to avoid thread leak
### Why are the changes needed?
1. NettyRpcEnv should clean rpcSource to prevent resource leak.
2. Master clean resourceConsumptionSource, masterSource, threadPoolSource,
jVMSource, jVMCPUSource, systemMiscSource
3. Worker clean clean workerSource.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
local
Closes #3418 from xy2953396112/CELEBORN-2104.
Authored-by: dz <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala | 3 +++
.../org/apache/celeborn/service/deploy/master/Master.scala | 13 ++++++++++---
.../org/apache/celeborn/service/deploy/worker/Worker.scala | 9 ++++++---
3 files changed, 19 insertions(+), 6 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index d857bd4b1..c0187f698 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -356,6 +356,9 @@ class NettyRpcEnv(
if (clientConnectionExecutor != null) {
clientConnectionExecutor.shutdownNow()
}
+ if (_rpcSource != null) {
+ _rpcSource.destroy()
+ }
}
override def deserialize[T](deserializationAction: () => T): T = {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 9446e6368..f4caae517 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -73,12 +73,16 @@ private[celeborn] class Master(
new ResourceConsumptionSource(conf, Role.MASTER)
private val threadPoolSource = ThreadPoolSource(conf, Role.MASTER)
private val masterSource = new MasterSource(conf)
+ private val jvmSource = new JVMSource(conf, Role.MASTER)
+ private val jvmCpuSource = new JVMCPUSource(conf, Role.MASTER)
+ private val systemMiscSource = new SystemMiscSource(conf, Role.MASTER)
+
metricsSystem.registerSource(resourceConsumptionSource)
metricsSystem.registerSource(masterSource)
metricsSystem.registerSource(threadPoolSource)
- metricsSystem.registerSource(new JVMSource(conf, Role.MASTER))
- metricsSystem.registerSource(new JVMCPUSource(conf, Role.MASTER))
- metricsSystem.registerSource(new SystemMiscSource(conf, Role.MASTER))
+ metricsSystem.registerSource(jvmSource)
+ metricsSystem.registerSource(jvmCpuSource)
+ metricsSystem.registerSource(systemMiscSource)
private val bindPreferIP: Boolean = conf.bindPreferIP
private val authEnabled = conf.authEnabled
@@ -381,6 +385,9 @@ private[celeborn] class Master(
sendApplicationMetaExecutor.shutdownNow()
}
messagesHelper.close()
+
+ metricsSystem.stop()
+
logInfo("Celeborn Master is stopped.")
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 5107dfd5f..872231a93 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -74,12 +74,15 @@ private[celeborn] class Worker(
val resourceConsumptionSource =
new ResourceConsumptionSource(conf, Role.WORKER)
private val threadPoolSource = ThreadPoolSource(conf, Role.WORKER)
+ private val jvmSource = new JVMSource(conf, Role.WORKER)
+ private val jvmCpuSource = new JVMCPUSource(conf, Role.WORKER)
+ private val systemMiscSource = new SystemMiscSource(conf, Role.WORKER)
metricsSystem.registerSource(workerSource)
metricsSystem.registerSource(threadPoolSource)
metricsSystem.registerSource(resourceConsumptionSource)
- metricsSystem.registerSource(new JVMSource(conf, Role.WORKER))
- metricsSystem.registerSource(new JVMCPUSource(conf, Role.WORKER))
- metricsSystem.registerSource(new SystemMiscSource(conf, Role.WORKER))
+ metricsSystem.registerSource(jvmSource)
+ metricsSystem.registerSource(jvmCpuSource)
+ metricsSystem.registerSource(systemMiscSource)
private val topAppResourceConsumptionCount =
conf.metricsWorkerAppTopResourceConsumptionCount
private val topAppResourceConsumptionBytesWrittenThreshold =