This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e7f7f7611 [CELEBORN-2109] Add EvictedLocalFileCount and
EvictedDfsFileCount met…
e7f7f7611 is described below
commit e7f7f7611d8bc97dadd18641880ce3f2d7892510
Author: xxx <[email protected]>
AuthorDate: Thu Sep 11 21:05:16 2025 +0800
[CELEBORN-2109] Add EvictedLocalFileCount and EvictedDfsFileCount met…
…rics
### What changes were proposed in this pull request?
Add EvictedLocalFileCount and EvictedDfsFileCount metrics.
### Why are the changes needed?
Add EvictedLocalFileCount and EvictedDfsFileCount metrics to count how many
memory files are evicted to local or dfs.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Manual test.
[Grafana](https://xy2953396112.grafana.net/public-dashboards/07d6ad142d3d42a5b131323237dc1062)
Closes #3423 from xy2953396112/add_evict_metrics.
Authored-by: xxx <[email protected]>
Signed-off-by: fengmingxiao <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 178 ++++++++++++++++++++-
docs/monitoring.md | 4 +-
.../celeborn/service/deploy/worker/Worker.scala | 6 +
.../service/deploy/worker/WorkerSource.scala | 2 +
.../deploy/worker/storage/StorageManager.scala | 2 +
.../service/deploy/worker/storage/TierWriter.scala | 6 +
.../MemoryReducePartitionDataWriterSuiteJ.java | 5 +
.../deploy/worker/storage/TierWriterSuite.scala | 4 +
8 files changed, 205 insertions(+), 2 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 3ad72ada9..e5d8f83da 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -8728,7 +8728,183 @@
"refId": "A"
}
],
- "title": "EvictedFileCount",
+ "title": "metrics_EvictedFileCount_Value",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 161
+ },
+ "id": 262,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "builder",
+ "expr":
"metrics_EvictedLocalFileCount_Value{instance=~\"${instance}\"}",
+ "instant": false,
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_EvictedLocalFileCount_Value",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 161
+ },
+ "id": 263,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "builder",
+ "expr":
"metrics_EvictedDfsFileCount_Value{instance=~\"${instance}\"}",
+ "instant": false,
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_EvictedDfsFileCount_Value",
"type": "timeseries"
},
{
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 44a52e5b8..351d6e4db 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -259,7 +259,9 @@ These metrics are exposed by Celeborn worker.
| UnreleasedPartitionLocationCount | Unreleased partition location
count when worker is shutting down.
|
| MemoryStorageFileCount | The count of files in Memory
Storage of a worker.
|
| MemoryFileStorageSize | The total amount of memory used
by Memory Storage.
|
- | EvictedFileCount | The count of files evicted from
Memory Storage to Disk
|
+ | EvictedFileCount | The count of files evicted from
Memory Storage to Disk.
|
+ | EvictedLocalFileCount | The count of files evicted from
Memory Storage to LocalDisk.
|
+ | EvictedDfsFileCount | The count of files evicted from
Memory Storage to Dfs.
|
| DirectMemoryUsageRatio | Ratio of direct memory used and
max direct memory.
|
| RegisterWithMasterFailCount | The count of failures in
register with master request.
|
| FlushWorkingQueueSize | The size of flush working queue
for mount point.
|
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 872231a93..f66b861ac 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -415,6 +415,12 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.EVICTED_FILE_COUNT) { () =>
storageManager.evictedFileCount.get()
}
+ workerSource.addGauge(WorkerSource.EVICTED_LOCAL_FILE_COUNT) { () =>
+ storageManager.evictedLocalFileCount.get()
+ }
+ workerSource.addGauge(WorkerSource.EVICTED_DFS_FILE_COUNT) { () =>
+ storageManager.evictedDfsFileCount.get()
+ }
workerSource.addGauge(WorkerSource.MEMORY_STORAGE_FILE_COUNT) { () =>
storageManager.memoryWriters.size()
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 0d6328340..aefb453d8 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -239,6 +239,8 @@ object WorkerSource {
val MEMORY_FILE_STORAGE_SIZE = "MemoryFileStorageSize"
val DIRECT_MEMORY_USAGE_RATIO = "DirectMemoryUsageRatio"
val EVICTED_FILE_COUNT = "EvictedFileCount"
+ val EVICTED_LOCAL_FILE_COUNT = "EvictedLocalFileCount"
+ val EVICTED_DFS_FILE_COUNT = "EvictedDfsFileCount"
val MEMORY_STORAGE_FILE_COUNT = "MemoryStorageFileCount"
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 3c1f1ec19..e6cbabdca 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -412,6 +412,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
@VisibleForTesting
val evictedFileCount = new AtomicLong
+ val evictedLocalFileCount = new AtomicLong
+ val evictedDfsFileCount = new AtomicLong
@throws[IOException]
def createPartitionDataWriter(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index e083108e8..f4b5e19f9 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -308,6 +308,12 @@ class MemoryTierWriter(
MemoryManager.instance.incrementDiskBuffer(numBytes)
storageManager.unregisterMemoryPartitionWriterAndFileInfo(fileInfo,
shuffleKey, filename)
storageManager.evictedFileCount.incrementAndGet
+ if (file.isInstanceOf[LocalTierWriter]) {
+ storageManager.evictedLocalFileCount.incrementAndGet()
+ }
+ if (file.isInstanceOf[DfsTierWriter]) {
+ storageManager.evictedDfsFileCount.incrementAndGet()
+ }
}
}
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index 8aa0311be..92d0fd5d4 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -157,8 +157,12 @@ public class MemoryReducePartitionDataWriterSuiteJ {
storagePolicy = Mockito.mock(StoragePolicy.class);
AtomicLong evictCount = new AtomicLong();
+ AtomicLong evictLocalCount = new AtomicLong();
+ AtomicLong evictDfsCount = new AtomicLong();
Mockito.when(storageManager.storagePolicy()).thenAnswer(a ->
storagePolicy);
Mockito.when(storageManager.evictedFileCount()).thenAnswer(a ->
evictCount);
+ Mockito.when(storageManager.evictedLocalFileCount()).thenAnswer(a ->
evictLocalCount);
+ Mockito.when(storageManager.evictedDfsFileCount()).thenAnswer(a ->
evictDfsCount);
Mockito.when(storageManager.localOrDfsStorageAvailable()).thenAnswer(a ->
true);
Mockito.when(storageManager.storageBufferAllocator()).thenAnswer(a ->
allocator);
MemoryManager.initialize(conf, storageManager, null);
@@ -621,6 +625,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
closeChunkServer();
assert storageManager.evictedFileCount().get() > 0;
+ assert storageManager.evictedLocalFileCount().get() > 0;
assert MemoryManager.instance().getMemoryFileStorageCounter() ==
memoryFileStorageBefore;
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
index 0cd13f8a9..43f999b29 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
@@ -77,9 +77,13 @@ class TierWriterSuite extends AnyFunSuite with
BeforeAndAfterEach {
val transConf = new TransportConf("shuffle", new CelebornConf)
val allocator = NettyUtils.getByteBufAllocator(transConf, source, false)
val evictedFileCount = new AtomicLong()
+ val evictedLocalFileCount = new AtomicLong()
+ val evictedDfsFileCount = new AtomicLong()
when(storageManager.storageBufferAllocator).thenReturn(allocator)
when(storageManager.localOrDfsStorageAvailable).thenReturn(true)
when(storageManager.evictedFileCount).thenReturn(evictedFileCount)
+
when(storageManager.evictedLocalFileCount).thenReturn(evictedLocalFileCount)
+ when(storageManager.evictedDfsFileCount).thenReturn(evictedDfsFileCount)
MemoryManager.initialize(celebornConf, storageManager, null)