This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b5e23db3 [CELEBORN-1215] Introduce PausePushDataAndReplicateTime 
metric to record time for a worker to stop receiving pushData from clients and 
other workers
4b5e23db3 is described below

commit 4b5e23db37e4d01a9a1fca595c5c77c0ad4506db
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jan 10 19:55:04 2024 +0800

    [CELEBORN-1215] Introduce PausePushDataAndReplicateTime metric to record 
time for a worker to stop receiving pushData from clients and other workers
    
    ### What changes were proposed in this pull request?
    
    Introduce `PausePushDataAndReplicateTime` metric to record time for a 
worker to stop receiving pushData from clients and other workers.
    
    ### Why are the changes needed?
    
    `PausePushData` means the count for a worker to stop receiving pushData 
from clients because of back pressure. Meanwhile, `PausePushDataAndReplicate` 
means the count for a worker to stop receiving pushData from clients and other 
workers because of back pressure. Therefore,`PausePushDataTime` records the 
time for a worker to stop receiving pushData from clients or other workers, of 
which definition is confusing for users. It's recommended that 
`PausePushDataAndReplicateTime` metric is  [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - [Celeborn 
Dashboard](https://stenicholas.grafana.net/d/U_qgru_7z/celeborn?orgId=1&refresh=5s)
    - `MemoryManagerSuite#[CELEBORN-882] Test MemoryManager check memory thread 
logic`
    
    Closes #2221 from SteNicholas/CELEBORN-1215.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 METRICS.md                                         |   6 +-
 assets/grafana/celeborn-dashboard.json             | 119 ++++++++++++++++++---
 docs/monitoring.md                                 |   4 +
 .../deploy/worker/memory/MemoryManager.java        |  27 +++--
 .../celeborn/service/deploy/worker/Worker.scala    |  15 +--
 .../service/deploy/worker/WorkerSource.scala       |   7 +-
 .../service/deploy/memory/MemoryManagerSuite.scala |   4 +-
 7 files changed, 149 insertions(+), 33 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index 2da833924..781390ee9 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -114,8 +114,10 @@ Here is an example of Grafana dashboard importing.
 |              SortedFiles               |      worker       |                 
              This value means the count of sorted shuffle files.               
                |
 |             SortedFileSize             |      worker       |                 
       This value means the count of sorted shuffle files 's total size.        
                |
 |               DiskBuffer               |      worker       | Disk buffers 
are part of netty used memory, means data need to write to disk but haven't 
been written to disk.  |
-|             PausePushData              |      worker       |                 
  PausePushData means the count of worker stopped receiving data from client.   
                |
-|       PausePushDataAndReplicate        |      worker       |    
PausePushDataAndReplicate means the count of worker stopped receiving data from 
client and other workers.    |
+|           PausePushDataTime            |      worker       |                 
             PausePushData means stop receiving data from client.               
                |
+|     PausePushDataAndReplicateTime      |      worker       |               
PausePushDataAndReplicate means stop receiving data from client and other 
workers.                |
+|             PausePushData              |      worker       |                 
      The count of stopping receiving data from client in current worker.       
                |
+|       PausePushDataAndReplicate        |      worker       |              
The count of stopping receiving data from client and other workers in current 
worker.              |
 |              jvm_gc_count              |        JVM        |                 
                    The GC count of each garbage collector.                     
                |
 |              jvm_gc_time               |        JVM        |                 
                  The GC cost time of each garbage collector.                   
                |
 |          jvm_memory_heap_init          |        JVM        |                 
                        The amount of heap init memory.                         
                |
diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 3f0de33c9..68cfdd9ef 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1692,7 +1692,8 @@
                     "value": 80
                   }
                 ]
-              }
+              },
+              "unit": "ms"
             },
             "overrides": []
           },
@@ -1702,7 +1703,7 @@
             "x": 12,
             "y": 30
           },
-          "id": 179,
+          "id": 184,
           "options": {
             "legend": {
               "calcs": [],
@@ -1721,13 +1722,13 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_ActiveConnectionCount_Count",
+              "expr": "metrics_PausePushDataAndReplicateTime_Value",
               "legendFormat": "${baseLegend}",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_ActiveConnectionCount_Count",
+          "title": "metrics_PausePushDataAndReplicateTime_Value",
           "type": "timeseries"
         },
         {
@@ -1916,6 +1917,96 @@
           ],
           "title": "metrics_ActiveShuffleFileCount_Value",
           "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 46
+          },
+          "id": 179,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": "metrics_ActiveConnectionCount_Count",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_ActiveConnectionCount_Count",
+          "type": "timeseries"
         }
       ],
       "title": "Worker",
@@ -1927,7 +2018,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 3
+        "y": 54
       },
       "id": 134,
       "panels": [
@@ -2938,7 +3029,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 54
+        "y": 55
       },
       "id": 12,
       "panels": [
@@ -3585,7 +3676,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 55
+        "y": 56
       },
       "id": 10,
       "panels": [
@@ -4139,7 +4230,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 56
+        "y": 57
       },
       "id": 8,
       "panels": [
@@ -5161,7 +5252,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 57
+        "y": 58
       },
       "id": 50,
       "panels": [
@@ -5717,7 +5808,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 58
+        "y": 59
       },
       "id": 157,
       "panels": [
@@ -6010,7 +6101,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 59
+        "y": 60
       },
       "id": 137,
       "panels": [
@@ -7401,7 +7492,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 60
+        "y": 61
       },
       "id": 110,
       "panels": [
@@ -7597,7 +7688,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 61
+        "y": 62
       },
       "id": 123,
       "panels": [
@@ -8075,7 +8166,7 @@
         "h": 1,
         "w": 24,
         "x": 0,
-        "y": 62
+        "y": 63
       },
       "id": 172,
       "panels": [
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 1ada46e2c..b172f496e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -180,6 +180,10 @@ These metrics are exposed by Celeborn worker.
     - SortedFileSize
     - DiskBuffer
         - The memory occupied by pushData and pushMergedData which should be 
written to disk.
+    - PausePushDataTime
+        - The time for a worker to stop receiving pushData from clients 
because of back pressure.
+    - PausePushDataAndReplicateTime
+        - The time for a worker to stop receiving pushData from clients and 
other workers because of back pressure.
     - PausePushData
         - The count for a worker to stop receiving pushData from clients 
because of back pressure.
     - PausePushDataAndReplicate
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index e6d08d5f6..e04975ec8 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -67,8 +67,10 @@ public class MemoryManager {
   private final LongAdder pausePushDataCounter = new LongAdder();
   private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
   private ServingState servingState = ServingState.NONE_PAUSED;
-  private long pauseStartTime = -1L;
+  private long pausePushDataStartTime = -1L;
   private long pausePushDataTime = 0L;
+  private long pausePushDataAndReplicateStartTime = -1L;
+  private long pausePushDataAndReplicateTime = 0L;
   private int trimCounter = 0;
   private volatile boolean isPaused = false;
   // For credit stream
@@ -256,7 +258,7 @@ public class MemoryManager {
         if (trimCounter >= forceAppendPauseSpentTimeThreshold) {
           logger.debug(
               "Trigger action: TRIM for {} times, force to append pause spent 
time.", trimCounter);
-          appendPauseSpentTime();
+          appendPauseSpentTime(servingState);
         }
         trimAllListeners();
       }
@@ -273,7 +275,7 @@ public class MemoryManager {
                   
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
         } else if (lastState == ServingState.NONE_PAUSED) {
           logger.info("Trigger action: PAUSE PUSH");
-          pauseStartTime = System.currentTimeMillis();
+          pausePushDataStartTime = System.currentTimeMillis();
           memoryPressureListeners.forEach(
               memoryPressureListener ->
                   
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -284,7 +286,7 @@ public class MemoryManager {
         pausePushDataAndReplicateCounter.increment();
         if (lastState == ServingState.NONE_PAUSED) {
           logger.info("Trigger action: PAUSE PUSH");
-          pauseStartTime = System.currentTimeMillis();
+          pausePushDataAndReplicateStartTime = System.currentTimeMillis();
           memoryPressureListeners.forEach(
               memoryPressureListener ->
                   
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -297,7 +299,7 @@ public class MemoryManager {
         break;
       case NONE_PAUSED:
         // resume from paused mode, append pause spent time
-        appendPauseSpentTime();
+        appendPauseSpentTime(lastState);
         if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
           logger.info("Trigger action: RESUME REPLICATE");
           memoryPressureListeners.forEach(
@@ -408,10 +410,19 @@ public class MemoryManager {
     return pausePushDataTime;
   }
 
-  private void appendPauseSpentTime() {
+  public long getPausePushDataAndReplicateTime() {
+    return pausePushDataAndReplicateTime;
+  }
+
+  private void appendPauseSpentTime(ServingState servingState) {
     long nextPauseStartTime = System.currentTimeMillis();
-    pausePushDataTime += nextPauseStartTime - pauseStartTime;
-    pauseStartTime = nextPauseStartTime;
+    if (servingState == ServingState.PUSH_PAUSED) {
+      pausePushDataTime += nextPauseStartTime - pausePushDataStartTime;
+      pausePushDataStartTime = nextPauseStartTime;
+    } else {
+      pausePushDataAndReplicateTime += nextPauseStartTime - 
pausePushDataAndReplicateStartTime;
+      pausePushDataAndReplicateStartTime = nextPauseStartTime;
+    }
     // reset
     trimCounter = 0;
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index fd14a0460..8ee260e1f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -306,12 +306,6 @@ private[celeborn] class Worker(
   workerSource.addGauge(WorkerSource.NETTY_MEMORY) { () =>
     memoryManager.getNettyUsedDirectMemory
   }
-  workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_COUNT) { () =>
-    memoryManager.getPausePushDataCounter
-  }
-  workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_COUNT) { () 
=>
-    memoryManager.getPausePushDataAndReplicateCounter
-  }
   workerSource.addGauge(WorkerSource.BUFFER_STREAM_READ_BUFFER) { () =>
     memoryManager.getReadBufferCounter
   }
@@ -330,6 +324,15 @@ private[celeborn] class Worker(
   workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_TIME) { () =>
     memoryManager.getPausePushDataTime
   }
+  workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_TIME) { () 
=>
+    memoryManager.getPausePushDataAndReplicateTime
+  }
+  workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_COUNT) { () =>
+    memoryManager.getPausePushDataCounter
+  }
+  workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_COUNT) { () 
=>
+    memoryManager.getPausePushDataAndReplicateCounter
+  }
 
   private def highWorkload: Boolean = {
     (memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 911db859e..0574567ed 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -101,7 +101,12 @@ object WorkerSource {
   val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
   val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
   val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
+
+  // pause push data
   val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"
+  val PAUSE_PUSH_DATA_AND_REPLICATE_TIME = "PausePushDataAndReplicateTime"
+  val PAUSE_PUSH_DATA_COUNT = "PausePushData"
+  val PAUSE_PUSH_DATA_AND_REPLICATE_COUNT = "PausePushDataAndReplicate"
 
   // flush
   val TAKE_BUFFER_TIME = "TakeBufferTime"
@@ -123,8 +128,6 @@ object WorkerSource {
   val SORTED_FILES = "SortedFiles"
   val SORTED_FILE_SIZE = "SortedFileSize"
   val DISK_BUFFER = "DiskBuffer"
-  val PAUSE_PUSH_DATA_COUNT = "PausePushData"
-  val PAUSE_PUSH_DATA_AND_REPLICATE_COUNT = "PausePushDataAndReplicate"
   val BUFFER_STREAM_READ_BUFFER = "BufferStreamReadBuffer"
   val READ_BUFFER_DISPATCHER_REQUESTS_LENGTH = 
"ReadBufferDispatcherRequestsLength"
   val READ_BUFFER_ALLOCATED_COUNT = "ReadBufferAllocatedCount"
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index f26962929..6f91b4374 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -133,6 +133,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
     }
     // [CELEBORN-882] Test record pause push time
     assert(memoryManager.getPausePushDataTime.longValue() > 0)
+    assert(memoryManager.getPausePushDataAndReplicateTime.longValue() == 0)
     val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
 
     // NONE PAUSED -> PAUSE PUSH AND REPLICATE
@@ -148,7 +149,8 @@ class MemoryManagerSuite extends CelebornFunSuite {
       assert(!pushListener.isPause)
       assert(!replicateListener.isPause)
     }
-    assert(memoryManager.getPausePushDataTime.longValue() > lastPauseTime)
+    assert(memoryManager.getPausePushDataTime.longValue() == lastPauseTime)
+    assert(memoryManager.getPausePushDataAndReplicateTime.longValue() > 0)
   }
 
   class MockMemoryPressureListener(

Reply via email to