This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d12aafd54 [CELEBORN-2105] RpcMetricsTracker should clean up metrics
for stopping Inbox
d12aafd54 is described below
commit d12aafd542fe95e63c4f01747a99eb851576f788
Author: dz <[email protected]>
AuthorDate: Fri Nov 7 10:00:35 2025 +0800
[CELEBORN-2105] RpcMetricsTracker should clean up metrics for stopping Inbox
### What changes were proposed in this pull request?
`RpcMetricsTracker` should clean up metrics for stopping `Inbox` to avoid
resource leak.
### Why are the changes needed?
When `Inbox` is closing, `RpcMetricsTracker` does not clean up metrics at
present, which may cause resource leak.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3419 from xy2953396112/CELEBORN-2105.
Authored-by: dz <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/common/metrics/source/AbstractSource.scala | 5 +++++
.../scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala | 9 ++++++++-
.../main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala | 3 ++-
3 files changed, 15 insertions(+), 2 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 43e8ee28d..6034dd9ca 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -260,6 +260,11 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
namedGauges.remove(removeMetric(name, labels))
}
+ def removeGauge[T](name: String): Unit = removeGauge(name, Map.empty[String,
String])
+
+ def removeTimer[T](name: String): Unit =
+ namedTimers.remove(removeMetric(name, Map.empty[String, String]))
+
def removeMetric(name: String, labels: Map[String, String]): String = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
metricRegistry.remove(metricNameWithLabel)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
index f9fffe306..e9eedeb2e 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
@@ -67,6 +67,13 @@ private[celeborn] class RpcMetricsTracker(
}
}
+ def close(): Unit = {
+ histogramMap.clear()
+ rpcSource.removeGauge(RpcSource.QUEUE_LENGTH)
+ rpcSource.removeTimer(RpcSource.QUEUE_TIME)
+ rpcSource.removeTimer(RpcSource.PROCESS_TIME)
+ }
+
def updateHistogram(name: String, value: Long): Unit = {
histogramMap.putIfAbsent(name, new Histogram(new UniformReservoir()))
val histogram = histogramMap.get(name)
@@ -143,7 +150,7 @@ private[celeborn] class RpcMetricsTracker(
if (!useHistogram)
return
- val builder = new StringBuilder();
+ val builder = new StringBuilder()
builder.append(s"RPC statistics for $name (time unit: ns)").append("\n")
builder.append(s"current queue size = ${queueLengthFunc()}").append("\n")
builder.append(s"max queue length = ${maxQueueLength.get()}").append("\n")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
index b179128fb..ced47a75b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
@@ -280,11 +280,12 @@ private[celeborn] class Inbox(
// safely.
enableConcurrent = false
stopped = true
- addMessage(OnStop)
metrics.dump()
+ addMessage(OnStop)
// Note: The concurrent events in messages will be processed one by
one.
}
} finally {
+ metrics.close()
inboxLock.unlock()
}
}