This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 0b5029703 [CELEBORN-1215] Introduce PausePushDataAndReplicateTime
metric to record time for a worker to stop receiving pushData from clients and
other workers
0b5029703 is described below
commit 0b5029703c119fd5777364ba4e008d83a9778aba
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jan 10 19:55:04 2024 +0800
[CELEBORN-1215] Introduce PausePushDataAndReplicateTime metric to record
time for a worker to stop receiving pushData from clients and other workers
### What changes were proposed in this pull request?
Introduce `PausePushDataAndReplicateTime` metric to record time for a
worker to stop receiving pushData from clients and other workers.
### Why are the changes needed?
`PausePushData` means the count for a worker to stop receiving pushData
from clients because of back pressure. Meanwhile, `PausePushDataAndReplicate`
means the count for a worker to stop receiving pushData from clients and other
workers because of back pressure. Therefore,`PausePushDataTime` records the
time for a worker to stop receiving pushData from clients or other workers, of
which definition is confusing for users. It's recommended that
`PausePushDataAndReplicateTime` metric is [...]
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- [Celeborn
Dashboard](https://stenicholas.grafana.net/d/U_qgru_7z/celeborn?orgId=1&refresh=5s)
- `MemoryManagerSuite#[CELEBORN-882] Test MemoryManager check memory thread
logic`
Closes #2221 from SteNicholas/CELEBORN-1215.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 4b5e23db37e4d01a9a1fca595c5c77c0ad4506db)
Signed-off-by: mingji <[email protected]>
---
METRICS.md | 6 +-
assets/grafana/celeborn-dashboard.json | 119 ++++++++++++++++++---
docs/monitoring.md | 4 +
.../deploy/worker/memory/MemoryManager.java | 27 +++--
.../celeborn/service/deploy/worker/Worker.scala | 15 +--
.../service/deploy/worker/WorkerSource.scala | 7 +-
.../service/deploy/memory/MemoryManagerSuite.scala | 4 +-
7 files changed, 149 insertions(+), 33 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index 2da833924..781390ee9 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -114,8 +114,10 @@ Here is an example of Grafana dashboard importing.
| SortedFiles | worker |
This value means the count of sorted shuffle files.
|
| SortedFileSize | worker |
This value means the count of sorted shuffle files 's total size.
|
| DiskBuffer | worker | Disk buffers
are part of netty used memory, means data need to write to disk but haven't
been written to disk. |
-| PausePushData | worker |
PausePushData means the count of worker stopped receiving data from client.
|
-| PausePushDataAndReplicate | worker |
PausePushDataAndReplicate means the count of worker stopped receiving data from
client and other workers. |
+| PausePushDataTime | worker |
PausePushData means stop receiving data from client.
|
+| PausePushDataAndReplicateTime | worker |
PausePushDataAndReplicate means stop receiving data from client and other
workers. |
+| PausePushData | worker |
The count of stopping receiving data from client in current worker.
|
+| PausePushDataAndReplicate | worker |
The count of stopping receiving data from client and other workers in current
worker. |
| jvm_gc_count | JVM |
The GC count of each garbage collector.
|
| jvm_gc_time | JVM |
The GC cost time of each garbage collector.
|
| jvm_memory_heap_init | JVM |
The amount of heap init memory.
|
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 3f0de33c9..68cfdd9ef 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1692,7 +1692,8 @@
"value": 80
}
]
- }
+ },
+ "unit": "ms"
},
"overrides": []
},
@@ -1702,7 +1703,7 @@
"x": 12,
"y": 30
},
- "id": 179,
+ "id": 184,
"options": {
"legend": {
"calcs": [],
@@ -1721,13 +1722,13 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_ActiveConnectionCount_Count",
+ "expr": "metrics_PausePushDataAndReplicateTime_Value",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
- "title": "metrics_ActiveConnectionCount_Count",
+ "title": "metrics_PausePushDataAndReplicateTime_Value",
"type": "timeseries"
},
{
@@ -1916,6 +1917,96 @@
],
"title": "metrics_ActiveShuffleFileCount_Value",
"type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 46
+ },
+ "id": 179,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_ActiveConnectionCount_Count",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_ActiveConnectionCount_Count",
+ "type": "timeseries"
}
],
"title": "Worker",
@@ -1927,7 +2018,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 3
+ "y": 54
},
"id": 134,
"panels": [
@@ -2938,7 +3029,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 54
+ "y": 55
},
"id": 12,
"panels": [
@@ -3585,7 +3676,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 55
+ "y": 56
},
"id": 10,
"panels": [
@@ -4139,7 +4230,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 56
+ "y": 57
},
"id": 8,
"panels": [
@@ -5161,7 +5252,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 57
+ "y": 58
},
"id": 50,
"panels": [
@@ -5717,7 +5808,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 58
+ "y": 59
},
"id": 157,
"panels": [
@@ -6010,7 +6101,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 59
+ "y": 60
},
"id": 137,
"panels": [
@@ -7401,7 +7492,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 60
+ "y": 61
},
"id": 110,
"panels": [
@@ -7597,7 +7688,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 61
+ "y": 62
},
"id": 123,
"panels": [
@@ -8075,7 +8166,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 62
+ "y": 63
},
"id": 172,
"panels": [
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 1ada46e2c..b172f496e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -180,6 +180,10 @@ These metrics are exposed by Celeborn worker.
- SortedFileSize
- DiskBuffer
- The memory occupied by pushData and pushMergedData which should be
written to disk.
+ - PausePushDataTime
+ - The time for a worker to stop receiving pushData from clients
because of back pressure.
+ - PausePushDataAndReplicateTime
+ - The time for a worker to stop receiving pushData from clients and
other workers because of back pressure.
- PausePushData
- The count for a worker to stop receiving pushData from clients
because of back pressure.
- PausePushDataAndReplicate
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index e6d08d5f6..e04975ec8 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -67,8 +67,10 @@ public class MemoryManager {
private final LongAdder pausePushDataCounter = new LongAdder();
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
private ServingState servingState = ServingState.NONE_PAUSED;
- private long pauseStartTime = -1L;
+ private long pausePushDataStartTime = -1L;
private long pausePushDataTime = 0L;
+ private long pausePushDataAndReplicateStartTime = -1L;
+ private long pausePushDataAndReplicateTime = 0L;
private int trimCounter = 0;
private volatile boolean isPaused = false;
// For credit stream
@@ -256,7 +258,7 @@ public class MemoryManager {
if (trimCounter >= forceAppendPauseSpentTimeThreshold) {
logger.debug(
"Trigger action: TRIM for {} times, force to append pause spent
time.", trimCounter);
- appendPauseSpentTime();
+ appendPauseSpentTime(servingState);
}
trimAllListeners();
}
@@ -273,7 +275,7 @@ public class MemoryManager {
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
} else if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
- pauseStartTime = System.currentTimeMillis();
+ pausePushDataStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -284,7 +286,7 @@ public class MemoryManager {
pausePushDataAndReplicateCounter.increment();
if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
- pauseStartTime = System.currentTimeMillis();
+ pausePushDataAndReplicateStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -297,7 +299,7 @@ public class MemoryManager {
break;
case NONE_PAUSED:
// resume from paused mode, append pause spent time
- appendPauseSpentTime();
+ appendPauseSpentTime(lastState);
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
logger.info("Trigger action: RESUME REPLICATE");
memoryPressureListeners.forEach(
@@ -408,10 +410,19 @@ public class MemoryManager {
return pausePushDataTime;
}
- private void appendPauseSpentTime() {
+ public long getPausePushDataAndReplicateTime() {
+ return pausePushDataAndReplicateTime;
+ }
+
+ private void appendPauseSpentTime(ServingState servingState) {
long nextPauseStartTime = System.currentTimeMillis();
- pausePushDataTime += nextPauseStartTime - pauseStartTime;
- pauseStartTime = nextPauseStartTime;
+ if (servingState == ServingState.PUSH_PAUSED) {
+ pausePushDataTime += nextPauseStartTime - pausePushDataStartTime;
+ pausePushDataStartTime = nextPauseStartTime;
+ } else {
+ pausePushDataAndReplicateTime += nextPauseStartTime -
pausePushDataAndReplicateStartTime;
+ pausePushDataAndReplicateStartTime = nextPauseStartTime;
+ }
// reset
trimCounter = 0;
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index fd14a0460..8ee260e1f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -306,12 +306,6 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.NETTY_MEMORY) { () =>
memoryManager.getNettyUsedDirectMemory
}
- workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_COUNT) { () =>
- memoryManager.getPausePushDataCounter
- }
- workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_COUNT) { ()
=>
- memoryManager.getPausePushDataAndReplicateCounter
- }
workerSource.addGauge(WorkerSource.BUFFER_STREAM_READ_BUFFER) { () =>
memoryManager.getReadBufferCounter
}
@@ -330,6 +324,15 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_TIME) { () =>
memoryManager.getPausePushDataTime
}
+ workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_TIME) { ()
=>
+ memoryManager.getPausePushDataAndReplicateTime
+ }
+ workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_COUNT) { () =>
+ memoryManager.getPausePushDataCounter
+ }
+ workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_COUNT) { ()
=>
+ memoryManager.getPausePushDataAndReplicateCounter
+ }
private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 911db859e..0574567ed 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -101,7 +101,12 @@ object WorkerSource {
val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
+
+ // pause push data
val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"
+ val PAUSE_PUSH_DATA_AND_REPLICATE_TIME = "PausePushDataAndReplicateTime"
+ val PAUSE_PUSH_DATA_COUNT = "PausePushData"
+ val PAUSE_PUSH_DATA_AND_REPLICATE_COUNT = "PausePushDataAndReplicate"
// flush
val TAKE_BUFFER_TIME = "TakeBufferTime"
@@ -123,8 +128,6 @@ object WorkerSource {
val SORTED_FILES = "SortedFiles"
val SORTED_FILE_SIZE = "SortedFileSize"
val DISK_BUFFER = "DiskBuffer"
- val PAUSE_PUSH_DATA_COUNT = "PausePushData"
- val PAUSE_PUSH_DATA_AND_REPLICATE_COUNT = "PausePushDataAndReplicate"
val BUFFER_STREAM_READ_BUFFER = "BufferStreamReadBuffer"
val READ_BUFFER_DISPATCHER_REQUESTS_LENGTH =
"ReadBufferDispatcherRequestsLength"
val READ_BUFFER_ALLOCATED_COUNT = "ReadBufferAllocatedCount"
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index f26962929..6f91b4374 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -133,6 +133,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
}
// [CELEBORN-882] Test record pause push time
assert(memoryManager.getPausePushDataTime.longValue() > 0)
+ assert(memoryManager.getPausePushDataAndReplicateTime.longValue() == 0)
val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
// NONE PAUSED -> PAUSE PUSH AND REPLICATE
@@ -148,7 +149,8 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
- assert(memoryManager.getPausePushDataTime.longValue() > lastPauseTime)
+ assert(memoryManager.getPausePushDataTime.longValue() == lastPauseTime)
+ assert(memoryManager.getPausePushDataAndReplicateTime.longValue() > 0)
}
class MockMemoryPressureListener(