This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new d3c576664 [CELEBORN-2184] Introduce AvailableReadBuffer metric to 
monitor available memory for credit stream read buffer
d3c576664 is described below

commit d3c576664db9d0694c585403e7c510c08967ac7d
Author: SteNicholas <[email protected]>
AuthorDate: Tue Oct 28 15:10:53 2025 +0800

    [CELEBORN-2184] Introduce AvailableReadBuffer metric to monitor available 
memory for credit stream read buffer
    
    ### What changes were proposed in this pull request?
    
    Introduce `AvailableReadBuffer` metric to monitor available memory for 
credit stream read buffer.
    
    ### Why are the changes needed?
    
    `BufferStreamReadBuffer` metric is used to monitor the memory used by 
credit stream read buffer, which is not enough to monitor available memory for 
credit stream read buffer with max ratio of direct memory for read buffer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI and grafana manual test with [celeborn 
dashboard](https://stenicholas.grafana.net/public-dashboards/7d2ea01cb708482c9d1049c966132819).
    
    Closes #3516 from SteNicholas/CELEBORN-2184.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 93 ++++++++++++++++++++++
 docs/monitoring.md                                 |  5 +-
 .../deploy/worker/memory/MemoryManager.java        |  4 +
 .../deploy/worker/memory/ReadBufferDispatcher.java |  7 +-
 .../celeborn/service/deploy/worker/Worker.scala    |  3 +
 .../service/deploy/worker/WorkerSource.scala       |  1 +
 6 files changed, 108 insertions(+), 5 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index e5d8f83da..d00f3e907 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -12081,6 +12081,99 @@
           ],
           "title": "ReadBufferDispatcherRequestsLength",
           "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "description": "The available memory for credit stream read buffer.",
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "decbytes"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 289
+          },
+          "id": 107,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "builder",
+              "expr": 
"metrics_AvailableReadBuffer_Value{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "AvailableReadBuffer",
+          "type": "timeseries"
         }
       ],
       "title": "MemoryRelatives",
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 351d6e4db..19a25e769 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -241,6 +241,7 @@ These metrics are exposed by Celeborn worker.
     | BufferStreamReadBuffer                 | The memory used by credit 
stream read buffer.                                                             
      |
     | ReadBufferDispatcherRequestsLength     | The queue size of read buffer 
allocation requests.                                                            
  |
     | ReadBufferAllocatedCount               | Allocated read buffer count.    
                                                                                
|
+  - | AvailableReadBuffer                    | The available memory for credit 
stream read buffer.                                                             
|
     | ActiveCreditStreamCount                | Active stream count for map 
partition reading streams.                                                      
    |
     | ActiveMapPartitionCount                | The count of active map 
partition reading streams.                                                      
        |
     | SorterCacheHitRate                     | The cache hit rate for worker 
partition sorter index.                                                         
  |
@@ -254,14 +255,14 @@ These metrics are exposed by Celeborn worker.
     | UserProduceSpeed                       | The speed of user production 
for congestion control.                                                         
   |
     | WorkerConsumeSpeed                     | The speed of worker consumption 
for congestion control.                                                         
|
     | IsDecommissioningWorker                | 1 means worker decommissioning, 
0 means not decommissioning.                                                    
|
-    | IsHighWorkload                         | 1 means worker high workload, 0 
means not high workload.                                                    | 
+    | IsHighWorkload                         | 1 means worker high workload, 0 
means not high workload.                                                        
| 
     | UnreleasedShuffleCount                 | Unreleased shuffle count when 
worker is decommissioning.                                                      
  |
     | UnreleasedPartitionLocationCount       | Unreleased partition location 
count when worker is shutting down.                                             
  |
     | MemoryStorageFileCount                 | The count of files in Memory 
Storage of a worker.                                                            
   |
     | MemoryFileStorageSize                  | The total amount of memory used 
by Memory Storage.                                                              
|
     | EvictedFileCount                       | The count of files evicted from 
Memory Storage to Disk.                                                         
|
     | EvictedLocalFileCount                  | The count of files evicted from 
Memory Storage to LocalDisk.                                                    
|
-    | EvictedDfsFileCount                  | The count of files evicted from 
Memory Storage to Dfs.                                                          
  |
+    | EvictedDfsFileCount                    | The count of files evicted from 
Memory Storage to Dfs.                                                          
|
     | DirectMemoryUsageRatio                 | Ratio of direct memory used and 
max direct memory.                                                              
|
     | RegisterWithMasterFailCount            | The count of failures in 
register with master request.                                                   
       |
     | FlushWorkingQueueSize                  | The size of flush working queue 
for mount point.                                                                
|
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index da81c92a3..de17c4549 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -502,6 +502,10 @@ public class MemoryManager {
     return readBufferCounter.get() + requiredBytes < readBufferThreshold;
   }
 
+  public long availableReadBuffer() {
+    return Math.max(0, readBufferThreshold - readBufferCounter.get());
+  }
+
   public long getPausePushDataAndReplicateCounter() {
     return pausePushDataAndReplicateCounter.sum();
   }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index aa862f57b..3cf18057c 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -58,17 +58,18 @@ public class ReadBufferDispatcher {
     this.memoryManager = memoryManager;
     dispatcherThread =
         new AtomicReference<>(
-            ThreadUtils.newThread(new DispatcherRunnable(), 
"read-buffer-dispatcher"));
+            ThreadUtils.newThread(new DispatcherRunnable(), 
"worker-read-buffer-dispatcher"));
     dispatcherThread.get().start();
 
     if (checkThreadInterval > 0) {
       ScheduledExecutorService checkAliveThread =
-          
ThreadUtils.newDaemonSingleThreadScheduledExecutor("read-buffer-dispatcher-checker");
+          ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+              "worker-read-buffer-dispatcher-checker");
       checkAliveThread.scheduleWithFixedDelay(
           () -> {
             if (!dispatcherThread.get().isAlive()) {
               dispatcherThread.set(
-                  ThreadUtils.newThread(new DispatcherRunnable(), 
"read-buffer-dispatcher"));
+                  ThreadUtils.newThread(new DispatcherRunnable(), 
"worker-read-buffer-dispatcher"));
               dispatcherThread.get().start();
             }
           },
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f66b861ac..2ba3ba56a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -406,6 +406,9 @@ private[celeborn] class Worker(
   workerSource.addGauge(WorkerSource.READ_BUFFER_ALLOCATED_COUNT) { () =>
     memoryManager.getAllocatedReadBuffers
   }
+  workerSource.addGauge(WorkerSource.AVAILABLE_READ_BUFFER) { () =>
+    memoryManager.availableReadBuffer
+  }
   workerSource.addGauge(WorkerSource.MEMORY_FILE_STORAGE_SIZE) { () =>
     memoryManager.getMemoryFileStorageCounter
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index aefb453d8..ece183ee2 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -236,6 +236,7 @@ object WorkerSource {
   val BUFFER_STREAM_READ_BUFFER = "BufferStreamReadBuffer"
   val READ_BUFFER_DISPATCHER_REQUESTS_LENGTH = 
"ReadBufferDispatcherRequestsLength"
   val READ_BUFFER_ALLOCATED_COUNT = "ReadBufferAllocatedCount"
+  val AVAILABLE_READ_BUFFER = "AvailableReadBuffer"
   val MEMORY_FILE_STORAGE_SIZE = "MemoryFileStorageSize"
   val DIRECT_MEMORY_USAGE_RATIO = "DirectMemoryUsageRatio"
   val EVICTED_FILE_COUNT = "EvictedFileCount"

Reply via email to