This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 03a39819b [CELEBORN-882][WORKER][METRICS] Add `Pause Push Data Time
Count` Metrics & Dashboard Panel
03a39819b is described below
commit 03a39819b5d795e1782150f117b0ca11acc0583b
Author: zwangsheng <[email protected]>
AuthorDate: Tue Sep 12 17:45:26 2023 +0800
[CELEBORN-882][WORKER][METRICS] Add `Pause Push Data Time Count` Metrics &
Dashboard Panel
### What changes were proposed in this pull request?
Add `PausePushDataTime ` Metrics
### Why are the changes needed?
Count each celeborn worker pause time.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster Test
Closes #1800 from zwangsheng/CELEBORN-882.
Lead-authored-by: zwangsheng <[email protected]>
Co-authored-by: zwangsheng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 90 ++++++++++++++++++++++
.../org/apache/celeborn/common/CelebornConf.scala | 10 +++
docs/configuration/metrics.md | 1 +
.../deploy/worker/memory/MemoryManager.java | 29 ++++++-
.../celeborn/service/deploy/worker/Worker.scala | 3 +
.../service/deploy/worker/WorkerSource.scala | 1 +
.../service/deploy/memory/MemoryManagerSuite.scala | 4 +
7 files changed, 137 insertions(+), 1 deletion(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 12549d877..a2e6a1e62 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1264,6 +1264,96 @@
"title": "metrics_PausePushDataAndReplicate_Value",
"type": "timeseries"
},
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 63
+ },
+ "id": 182,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_PausePushDataTime_Value",
+ "refId": "A"
+ }
+ ],
+ "title": "Pause Push Data Time Count",
+ "type": "timeseries"
+ },
{
"datasource": {
"type": "prometheus",
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 48dafc219..8134bbe38 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -676,6 +676,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
def metricsAppTopDiskUsageWindowSize: Int =
get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
def metricsAppTopDiskUsageInterval: Long =
get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
+ def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
+ get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
// //////////////////////////////////////////////////////
// Quota //
@@ -3609,6 +3611,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10min")
+ val METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD: ConfigEntry[Int]
=
+ buildConf("celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold")
+ .categories("metrics")
+ .doc("Force append worker pause spent time even if worker still in pause
serving state." +
+ "Help user can find worker pause spent time increase, when worker
always been pause state.")
+ .intConf
+ .createWithDefault(10)
+
val QUOTA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.quota.enabled")
.categories("quota")
diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md
index 4847879a0..1c4d35246 100644
--- a/docs/configuration/metrics.md
+++ b/docs/configuration/metrics.md
@@ -31,6 +31,7 @@ license: |
| celeborn.metrics.master.prometheus.port | 9098 | Master's Prometheus port. |
0.3.0 |
| celeborn.metrics.sample.rate | 1.0 | It controls if Celeborn collect timer
metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 |
| celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size
of timer metric. | 0.2.0 |
+| celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold | 10 | Force
append worker pause spent time even if worker still in pause serving state.Help
user can find worker pause spent time increase, when worker always been pause
state. | |
| celeborn.metrics.worker.prometheus.host | <localhost> | Worker's
Prometheus host. | 0.3.0 |
| celeborn.metrics.worker.prometheus.port | 9096 | Worker's Prometheus port. |
0.3.0 |
<!--end-include-->
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index ac87db327..e6d08d5f6 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -48,6 +48,7 @@ public class MemoryManager {
private final long pauseReplicateThreshold;
private final long resumeThreshold;
private final long maxSortMemory;
+ private final int forceAppendPauseSpentTimeThreshold;
private final List<MemoryPressureListener> memoryPressureListeners = new
ArrayList<>();
private final ScheduledExecutorService checkService =
@@ -66,8 +67,10 @@ public class MemoryManager {
private final LongAdder pausePushDataCounter = new LongAdder();
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
private ServingState servingState = ServingState.NONE_PAUSED;
+ private long pauseStartTime = -1L;
+ private long pausePushDataTime = 0L;
+ private int trimCounter = 0;
private volatile boolean isPaused = false;
-
// For credit stream
private final AtomicLong readBufferCounter = new AtomicLong(0);
private long readBufferThreshold = 0;
@@ -111,6 +114,7 @@ public class MemoryManager {
double readBufferTargetRatio = conf.readBufferTargetRatio();
long readBufferTargetUpdateInterval =
conf.readBufferTargetUpdateInterval();
long readBufferTargetNotifyThreshold =
conf.readBufferTargetNotifyThreshold();
+ forceAppendPauseSpentTimeThreshold =
conf.metricsWorkerForceAppendPauseSpentTimeThreshold();
maxDirectorMemory =
DynMethods.builder("maxDirectMemory")
@@ -247,6 +251,13 @@ public class MemoryManager {
if (lastState == servingState) {
if (servingState != ServingState.NONE_PAUSED) {
logger.debug("Trigger action: TRIM");
+ trimCounter += 1;
+ // force to append pause spent time even we are in pause state
+ if (trimCounter >= forceAppendPauseSpentTimeThreshold) {
+ logger.debug(
+ "Trigger action: TRIM for {} times, force to append pause spent
time.", trimCounter);
+ appendPauseSpentTime();
+ }
trimAllListeners();
}
return;
@@ -262,6 +273,7 @@ public class MemoryManager {
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
} else if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
+ pauseStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -272,6 +284,7 @@ public class MemoryManager {
pausePushDataAndReplicateCounter.increment();
if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
+ pauseStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -283,6 +296,8 @@ public class MemoryManager {
trimAllListeners();
break;
case NONE_PAUSED:
+ // resume from paused mode, append pause spent time
+ appendPauseSpentTime();
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
logger.info("Trigger action: RESUME REPLICATE");
memoryPressureListeners.forEach(
@@ -389,6 +404,18 @@ public class MemoryManager {
return readBufferDispatcher.requestsLength();
}
+ public long getPausePushDataTime() {
+ return pausePushDataTime;
+ }
+
+ private void appendPauseSpentTime() {
+ long nextPauseStartTime = System.currentTimeMillis();
+ pausePushDataTime += nextPauseStartTime - pauseStartTime;
+ pauseStartTime = nextPauseStartTime;
+ // reset
+ trimCounter = 0;
+ }
+
public void addReadBufferTargetChangeListener(ReadBufferTargetChangeListener
listener) {
synchronized (readBufferTargetChangeListeners) {
readBufferTargetChangeListeners.add(listener);
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 597fc9e5b..77f0cd3ac 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -300,6 +300,9 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.ACTIVE_SHUFFLE_SIZE) { () =>
storageManager.getActiveShuffleSize()
}
+ workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_TIME) { () =>
+ memoryManager.getPausePushDataTime
+ }
private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 5690548a3..20a138f9a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -90,6 +90,7 @@ object WorkerSource {
val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
+ val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"
// flush
val TAKE_BUFFER_TIME = "TakeBufferTime"
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index 864ddffec..bfbb42043 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -131,6 +131,9 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
+ // [CELEBORN-882] Test record pause push time
+ assert(memoryManager.getPausePushDataTime.longValue() > 0)
+ val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
// NONE PAUSED -> PAUSE PUSH AND REPLICATE
memoryCounter.set(replicateThreshold + 1);
@@ -145,6 +148,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
+ assert(memoryManager.getPausePushDataTime.longValue() > lastPauseTime)
}
class MockMemoryPressureListener(