This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d3c576664 [CELEBORN-2184] Introduce AvailableReadBuffer metric to
monitor available memory for credit stream read buffer
d3c576664 is described below
commit d3c576664db9d0694c585403e7c510c08967ac7d
Author: SteNicholas <[email protected]>
AuthorDate: Tue Oct 28 15:10:53 2025 +0800
[CELEBORN-2184] Introduce AvailableReadBuffer metric to monitor available
memory for credit stream read buffer
### What changes were proposed in this pull request?
Introduce `AvailableReadBuffer` metric to monitor available memory for
credit stream read buffer.
### Why are the changes needed?
`BufferStreamReadBuffer` metric is used to monitor the memory used by
credit stream read buffer, which is not enough to monitor available memory for
credit stream read buffer with max ratio of direct memory for read buffer.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI and grafana manual test with [celeborn
dashboard](https://stenicholas.grafana.net/public-dashboards/7d2ea01cb708482c9d1049c966132819).
Closes #3516 from SteNicholas/CELEBORN-2184.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 93 ++++++++++++++++++++++
docs/monitoring.md | 5 +-
.../deploy/worker/memory/MemoryManager.java | 4 +
.../deploy/worker/memory/ReadBufferDispatcher.java | 7 +-
.../celeborn/service/deploy/worker/Worker.scala | 3 +
.../service/deploy/worker/WorkerSource.scala | 1 +
6 files changed, 108 insertions(+), 5 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index e5d8f83da..d00f3e907 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -12081,6 +12081,99 @@
],
"title": "ReadBufferDispatcherRequestsLength",
"type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "The available memory for credit stream read buffer.",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "decbytes"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 289
+ },
+ "id": 107,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "builder",
+ "expr":
"metrics_AvailableReadBuffer_Value{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "AvailableReadBuffer",
+ "type": "timeseries"
}
],
"title": "MemoryRelatives",
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 351d6e4db..19a25e769 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -241,6 +241,7 @@ These metrics are exposed by Celeborn worker.
| BufferStreamReadBuffer | The memory used by credit
stream read buffer.
|
| ReadBufferDispatcherRequestsLength | The queue size of read buffer
allocation requests.
|
| ReadBufferAllocatedCount | Allocated read buffer count.
|
+ - | AvailableReadBuffer | The available memory for credit
stream read buffer.
|
| ActiveCreditStreamCount | Active stream count for map
partition reading streams.
|
| ActiveMapPartitionCount | The count of active map
partition reading streams.
|
| SorterCacheHitRate | The cache hit rate for worker
partition sorter index.
|
@@ -254,14 +255,14 @@ These metrics are exposed by Celeborn worker.
| UserProduceSpeed | The speed of user production
for congestion control.
|
| WorkerConsumeSpeed | The speed of worker consumption
for congestion control.
|
| IsDecommissioningWorker | 1 means worker decommissioning,
0 means not decommissioning.
|
- | IsHighWorkload | 1 means worker high workload, 0
means not high workload. |
+ | IsHighWorkload | 1 means worker high workload, 0
means not high workload.
|
| UnreleasedShuffleCount | Unreleased shuffle count when
worker is decommissioning.
|
| UnreleasedPartitionLocationCount | Unreleased partition location
count when worker is shutting down.
|
| MemoryStorageFileCount | The count of files in Memory
Storage of a worker.
|
| MemoryFileStorageSize | The total amount of memory used
by Memory Storage.
|
| EvictedFileCount | The count of files evicted from
Memory Storage to Disk.
|
| EvictedLocalFileCount | The count of files evicted from
Memory Storage to LocalDisk.
|
- | EvictedDfsFileCount | The count of files evicted from
Memory Storage to Dfs.
|
+ | EvictedDfsFileCount | The count of files evicted from
Memory Storage to Dfs.
|
| DirectMemoryUsageRatio | Ratio of direct memory used and
max direct memory.
|
| RegisterWithMasterFailCount | The count of failures in
register with master request.
|
| FlushWorkingQueueSize | The size of flush working queue
for mount point.
|
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index da81c92a3..de17c4549 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -502,6 +502,10 @@ public class MemoryManager {
return readBufferCounter.get() + requiredBytes < readBufferThreshold;
}
+ public long availableReadBuffer() {
+ return Math.max(0, readBufferThreshold - readBufferCounter.get());
+ }
+
public long getPausePushDataAndReplicateCounter() {
return pausePushDataAndReplicateCounter.sum();
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index aa862f57b..3cf18057c 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -58,17 +58,18 @@ public class ReadBufferDispatcher {
this.memoryManager = memoryManager;
dispatcherThread =
new AtomicReference<>(
- ThreadUtils.newThread(new DispatcherRunnable(),
"read-buffer-dispatcher"));
+ ThreadUtils.newThread(new DispatcherRunnable(),
"worker-read-buffer-dispatcher"));
dispatcherThread.get().start();
if (checkThreadInterval > 0) {
ScheduledExecutorService checkAliveThread =
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("read-buffer-dispatcher-checker");
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ "worker-read-buffer-dispatcher-checker");
checkAliveThread.scheduleWithFixedDelay(
() -> {
if (!dispatcherThread.get().isAlive()) {
dispatcherThread.set(
- ThreadUtils.newThread(new DispatcherRunnable(),
"read-buffer-dispatcher"));
+ ThreadUtils.newThread(new DispatcherRunnable(),
"worker-read-buffer-dispatcher"));
dispatcherThread.get().start();
}
},
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f66b861ac..2ba3ba56a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -406,6 +406,9 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.READ_BUFFER_ALLOCATED_COUNT) { () =>
memoryManager.getAllocatedReadBuffers
}
+ workerSource.addGauge(WorkerSource.AVAILABLE_READ_BUFFER) { () =>
+ memoryManager.availableReadBuffer
+ }
workerSource.addGauge(WorkerSource.MEMORY_FILE_STORAGE_SIZE) { () =>
memoryManager.getMemoryFileStorageCounter
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index aefb453d8..ece183ee2 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -236,6 +236,7 @@ object WorkerSource {
val BUFFER_STREAM_READ_BUFFER = "BufferStreamReadBuffer"
val READ_BUFFER_DISPATCHER_REQUESTS_LENGTH =
"ReadBufferDispatcherRequestsLength"
val READ_BUFFER_ALLOCATED_COUNT = "ReadBufferAllocatedCount"
+ val AVAILABLE_READ_BUFFER = "AvailableReadBuffer"
val MEMORY_FILE_STORAGE_SIZE = "MemoryFileStorageSize"
val DIRECT_MEMORY_USAGE_RATIO = "DirectMemoryUsageRatio"
val EVICTED_FILE_COUNT = "EvictedFileCount"