This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2817f7fb9 [CELEBORN-2104] Clean up sources of NettyRpcEnv, Master and 
Worker to avoid thread leaks
2817f7fb9 is described below

commit 2817f7fb9ebe08e1b0be46c7e6ab48ee0177142a
Author: dz <[email protected]>
AuthorDate: Fri Aug 29 19:04:19 2025 +0800

    [CELEBORN-2104] Clean up sources of NettyRpcEnv, Master and Worker to avoid 
thread leaks
    
    ### What changes were proposed in this pull request?
    
    Fix NettyRpcEnv,Master,Worker `Source` to avoid thread leak
    
    ### Why are the changes needed?
    
    1. NettyRpcEnv should clean rpcSource to prevent resource leak.
    2. Master clean resourceConsumptionSource, masterSource, threadPoolSource, 
jVMSource, jVMCPUSource, systemMiscSource
    3. Worker clean clean workerSource.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    local
    
    Closes #3418 from xy2953396112/CELEBORN-2104.
    
    Authored-by: dz <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala  |  3 +++
 .../org/apache/celeborn/service/deploy/master/Master.scala  | 13 ++++++++++---
 .../org/apache/celeborn/service/deploy/worker/Worker.scala  |  9 ++++++---
 3 files changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index d857bd4b1..c0187f698 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -356,6 +356,9 @@ class NettyRpcEnv(
     if (clientConnectionExecutor != null) {
       clientConnectionExecutor.shutdownNow()
     }
+    if (_rpcSource != null) {
+      _rpcSource.destroy()
+    }
   }
 
   override def deserialize[T](deserializationAction: () => T): T = {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 9446e6368..f4caae517 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -73,12 +73,16 @@ private[celeborn] class Master(
     new ResourceConsumptionSource(conf, Role.MASTER)
   private val threadPoolSource = ThreadPoolSource(conf, Role.MASTER)
   private val masterSource = new MasterSource(conf)
+  private val jvmSource = new JVMSource(conf, Role.MASTER)
+  private val jvmCpuSource = new JVMCPUSource(conf, Role.MASTER)
+  private val systemMiscSource = new SystemMiscSource(conf, Role.MASTER)
+
   metricsSystem.registerSource(resourceConsumptionSource)
   metricsSystem.registerSource(masterSource)
   metricsSystem.registerSource(threadPoolSource)
-  metricsSystem.registerSource(new JVMSource(conf, Role.MASTER))
-  metricsSystem.registerSource(new JVMCPUSource(conf, Role.MASTER))
-  metricsSystem.registerSource(new SystemMiscSource(conf, Role.MASTER))
+  metricsSystem.registerSource(jvmSource)
+  metricsSystem.registerSource(jvmCpuSource)
+  metricsSystem.registerSource(systemMiscSource)
 
   private val bindPreferIP: Boolean = conf.bindPreferIP
   private val authEnabled = conf.authEnabled
@@ -381,6 +385,9 @@ private[celeborn] class Master(
       sendApplicationMetaExecutor.shutdownNow()
     }
     messagesHelper.close()
+
+    metricsSystem.stop()
+
     logInfo("Celeborn Master is stopped.")
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 5107dfd5f..872231a93 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -74,12 +74,15 @@ private[celeborn] class Worker(
   val resourceConsumptionSource =
     new ResourceConsumptionSource(conf, Role.WORKER)
   private val threadPoolSource = ThreadPoolSource(conf, Role.WORKER)
+  private val jvmSource = new JVMSource(conf, Role.WORKER)
+  private val jvmCpuSource = new JVMCPUSource(conf, Role.WORKER)
+  private val systemMiscSource = new SystemMiscSource(conf, Role.WORKER)
   metricsSystem.registerSource(workerSource)
   metricsSystem.registerSource(threadPoolSource)
   metricsSystem.registerSource(resourceConsumptionSource)
-  metricsSystem.registerSource(new JVMSource(conf, Role.WORKER))
-  metricsSystem.registerSource(new JVMCPUSource(conf, Role.WORKER))
-  metricsSystem.registerSource(new SystemMiscSource(conf, Role.WORKER))
+  metricsSystem.registerSource(jvmSource)
+  metricsSystem.registerSource(jvmCpuSource)
+  metricsSystem.registerSource(systemMiscSource)
 
   private val topAppResourceConsumptionCount = 
conf.metricsWorkerAppTopResourceConsumptionCount
   private val topAppResourceConsumptionBytesWrittenThreshold =

Reply via email to