This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 03a39819b [CELEBORN-882][WORKER][METRICS] Add `Pause Push Data Time 
Count` Metrics & Dashboard Panel
03a39819b is described below

commit 03a39819b5d795e1782150f117b0ca11acc0583b
Author: zwangsheng <[email protected]>
AuthorDate: Tue Sep 12 17:45:26 2023 +0800

    [CELEBORN-882][WORKER][METRICS] Add `Pause Push Data Time Count` Metrics & 
Dashboard Panel
    
    ### What changes were proposed in this pull request?
    Add `PausePushDataTime ` Metrics
    
    ### Why are the changes needed?
    Count each celeborn worker pause time.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Cluster Test
    
    Closes #1800 from zwangsheng/CELEBORN-882.
    
    Lead-authored-by: zwangsheng <[email protected]>
    Co-authored-by: zwangsheng <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 90 ++++++++++++++++++++++
 .../org/apache/celeborn/common/CelebornConf.scala  | 10 +++
 docs/configuration/metrics.md                      |  1 +
 .../deploy/worker/memory/MemoryManager.java        | 29 ++++++-
 .../celeborn/service/deploy/worker/Worker.scala    |  3 +
 .../service/deploy/worker/WorkerSource.scala       |  1 +
 .../service/deploy/memory/MemoryManagerSuite.scala |  4 +
 7 files changed, 137 insertions(+), 1 deletion(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 12549d877..a2e6a1e62 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1264,6 +1264,96 @@
           "title": "metrics_PausePushDataAndReplicate_Value",
           "type": "timeseries"
         },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green",
+                    "value": null
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 63
+          },
+          "id": 182,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": "metrics_PausePushDataTime_Value",
+              "refId": "A"
+            }
+          ],
+          "title": "Pause Push Data Time Count",
+          "type": "timeseries"
+        },
         {
           "datasource": {
             "type": "prometheus",
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 48dafc219..8134bbe38 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -676,6 +676,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
   def metricsAppTopDiskUsageWindowSize: Int = 
get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
   def metricsAppTopDiskUsageInterval: Long = 
get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
+  def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
+    get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
 
   // //////////////////////////////////////////////////////
   //                      Quota                         //
@@ -3609,6 +3611,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefaultString("10min")
 
+  val METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD: ConfigEntry[Int] 
=
+    buildConf("celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold")
+      .categories("metrics")
+      .doc("Force append worker pause spent time even if worker still in pause 
serving state." +
+        "Help user can find worker pause spent time increase, when worker 
always been pause state.")
+      .intConf
+      .createWithDefault(10)
+
   val QUOTA_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.quota.enabled")
       .categories("quota")
diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md
index 4847879a0..1c4d35246 100644
--- a/docs/configuration/metrics.md
+++ b/docs/configuration/metrics.md
@@ -31,6 +31,7 @@ license: |
 | celeborn.metrics.master.prometheus.port | 9098 | Master's Prometheus port. | 
0.3.0 | 
 | celeborn.metrics.sample.rate | 1.0 | It controls if Celeborn collect timer 
metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | 
 | celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size 
of timer metric. | 0.2.0 | 
+| celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold | 10 | Force 
append worker pause spent time even if worker still in pause serving state.Help 
user can find worker pause spent time increase, when worker always been pause 
state. |  | 
 | celeborn.metrics.worker.prometheus.host | &lt;localhost&gt; | Worker's 
Prometheus host. | 0.3.0 | 
 | celeborn.metrics.worker.prometheus.port | 9096 | Worker's Prometheus port. | 
0.3.0 | 
 <!--end-include-->
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index ac87db327..e6d08d5f6 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -48,6 +48,7 @@ public class MemoryManager {
   private final long pauseReplicateThreshold;
   private final long resumeThreshold;
   private final long maxSortMemory;
+  private final int forceAppendPauseSpentTimeThreshold;
   private final List<MemoryPressureListener> memoryPressureListeners = new 
ArrayList<>();
 
   private final ScheduledExecutorService checkService =
@@ -66,8 +67,10 @@ public class MemoryManager {
   private final LongAdder pausePushDataCounter = new LongAdder();
   private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
   private ServingState servingState = ServingState.NONE_PAUSED;
+  private long pauseStartTime = -1L;
+  private long pausePushDataTime = 0L;
+  private int trimCounter = 0;
   private volatile boolean isPaused = false;
-
   // For credit stream
   private final AtomicLong readBufferCounter = new AtomicLong(0);
   private long readBufferThreshold = 0;
@@ -111,6 +114,7 @@ public class MemoryManager {
     double readBufferTargetRatio = conf.readBufferTargetRatio();
     long readBufferTargetUpdateInterval = 
conf.readBufferTargetUpdateInterval();
     long readBufferTargetNotifyThreshold = 
conf.readBufferTargetNotifyThreshold();
+    forceAppendPauseSpentTimeThreshold = 
conf.metricsWorkerForceAppendPauseSpentTimeThreshold();
 
     maxDirectorMemory =
         DynMethods.builder("maxDirectMemory")
@@ -247,6 +251,13 @@ public class MemoryManager {
     if (lastState == servingState) {
       if (servingState != ServingState.NONE_PAUSED) {
         logger.debug("Trigger action: TRIM");
+        trimCounter += 1;
+        // force to append pause spent time even we are in pause state
+        if (trimCounter >= forceAppendPauseSpentTimeThreshold) {
+          logger.debug(
+              "Trigger action: TRIM for {} times, force to append pause spent 
time.", trimCounter);
+          appendPauseSpentTime();
+        }
         trimAllListeners();
       }
       return;
@@ -262,6 +273,7 @@ public class MemoryManager {
                   
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
         } else if (lastState == ServingState.NONE_PAUSED) {
           logger.info("Trigger action: PAUSE PUSH");
+          pauseStartTime = System.currentTimeMillis();
           memoryPressureListeners.forEach(
               memoryPressureListener ->
                   
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -272,6 +284,7 @@ public class MemoryManager {
         pausePushDataAndReplicateCounter.increment();
         if (lastState == ServingState.NONE_PAUSED) {
           logger.info("Trigger action: PAUSE PUSH");
+          pauseStartTime = System.currentTimeMillis();
           memoryPressureListeners.forEach(
               memoryPressureListener ->
                   
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -283,6 +296,8 @@ public class MemoryManager {
         trimAllListeners();
         break;
       case NONE_PAUSED:
+        // resume from paused mode, append pause spent time
+        appendPauseSpentTime();
         if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
           logger.info("Trigger action: RESUME REPLICATE");
           memoryPressureListeners.forEach(
@@ -389,6 +404,18 @@ public class MemoryManager {
     return readBufferDispatcher.requestsLength();
   }
 
+  public long getPausePushDataTime() {
+    return pausePushDataTime;
+  }
+
+  private void appendPauseSpentTime() {
+    long nextPauseStartTime = System.currentTimeMillis();
+    pausePushDataTime += nextPauseStartTime - pauseStartTime;
+    pauseStartTime = nextPauseStartTime;
+    // reset
+    trimCounter = 0;
+  }
+
   public void addReadBufferTargetChangeListener(ReadBufferTargetChangeListener 
listener) {
     synchronized (readBufferTargetChangeListeners) {
       readBufferTargetChangeListeners.add(listener);
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 597fc9e5b..77f0cd3ac 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -300,6 +300,9 @@ private[celeborn] class Worker(
   workerSource.addGauge(WorkerSource.ACTIVE_SHUFFLE_SIZE) { () =>
     storageManager.getActiveShuffleSize()
   }
+  workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_TIME) { () =>
+    memoryManager.getPausePushDataTime
+  }
 
   private def highWorkload: Boolean = {
     (memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 5690548a3..20a138f9a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -90,6 +90,7 @@ object WorkerSource {
   val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
   val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
   val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
+  val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"
 
   // flush
   val TAKE_BUFFER_TIME = "TakeBufferTime"
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index 864ddffec..bfbb42043 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -131,6 +131,9 @@ class MemoryManagerSuite extends CelebornFunSuite {
       assert(!pushListener.isPause)
       assert(!replicateListener.isPause)
     }
+    // [CELEBORN-882] Test record pause push time
+    assert(memoryManager.getPausePushDataTime.longValue() > 0)
+    val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
 
     // NONE PAUSED -> PAUSE PUSH AND REPLICATE
     memoryCounter.set(replicateThreshold + 1);
@@ -145,6 +148,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
       assert(!pushListener.isPause)
       assert(!replicateListener.isPause)
     }
+    assert(memoryManager.getPausePushDataTime.longValue() > lastPauseTime)
   }
 
   class MockMemoryPressureListener(

Reply via email to