This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new d12aafd54 [CELEBORN-2105] RpcMetricsTracker should clean up metrics 
for stopping Inbox
d12aafd54 is described below

commit d12aafd542fe95e63c4f01747a99eb851576f788
Author: dz <[email protected]>
AuthorDate: Fri Nov 7 10:00:35 2025 +0800

    [CELEBORN-2105] RpcMetricsTracker should clean up metrics for stopping Inbox
    
    ### What changes were proposed in this pull request?
    
    `RpcMetricsTracker` should clean up metrics for stopping `Inbox` to avoid 
resource leak.
    
    ### Why are the changes needed?
    
    When `Inbox` is closing, `RpcMetricsTracker` does not clean up metrics at 
present, which may cause resource leak.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3419 from xy2953396112/CELEBORN-2105.
    
    Authored-by: dz <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/celeborn/common/metrics/source/AbstractSource.scala   | 5 +++++
 .../scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala | 9 ++++++++-
 .../main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala  | 3 ++-
 3 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 43e8ee28d..6034dd9ca 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -260,6 +260,11 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     namedGauges.remove(removeMetric(name, labels))
   }
 
+  def removeGauge[T](name: String): Unit = removeGauge(name, Map.empty[String, 
String])
+
+  def removeTimer[T](name: String): Unit =
+    namedTimers.remove(removeMetric(name, Map.empty[String, String]))
+
   def removeMetric(name: String, labels: Map[String, String]): String = {
     val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
     metricRegistry.remove(metricNameWithLabel)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
index f9fffe306..e9eedeb2e 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
@@ -67,6 +67,13 @@ private[celeborn] class RpcMetricsTracker(
     }
   }
 
+  def close(): Unit = {
+    histogramMap.clear()
+    rpcSource.removeGauge(RpcSource.QUEUE_LENGTH)
+    rpcSource.removeTimer(RpcSource.QUEUE_TIME)
+    rpcSource.removeTimer(RpcSource.PROCESS_TIME)
+  }
+
   def updateHistogram(name: String, value: Long): Unit = {
     histogramMap.putIfAbsent(name, new Histogram(new UniformReservoir()))
     val histogram = histogramMap.get(name)
@@ -143,7 +150,7 @@ private[celeborn] class RpcMetricsTracker(
     if (!useHistogram)
       return
 
-    val builder = new StringBuilder();
+    val builder = new StringBuilder()
     builder.append(s"RPC statistics for $name (time unit: ns)").append("\n")
     builder.append(s"current queue size = ${queueLengthFunc()}").append("\n")
     builder.append(s"max queue length = ${maxQueueLength.get()}").append("\n")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
index b179128fb..ced47a75b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
@@ -280,11 +280,12 @@ private[celeborn] class Inbox(
         // safely.
         enableConcurrent = false
         stopped = true
-        addMessage(OnStop)
         metrics.dump()
+        addMessage(OnStop)
         // Note: The concurrent events in messages will be processed one by 
one.
       }
     } finally {
+      metrics.close()
       inboxLock.unlock()
     }
   }

Reply via email to