This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit fc8b8dd6710a3df01f34bce24e65ca321df4e41e Author: xxx <[email protected]> AuthorDate: Thu Aug 21 11:17:44 2025 +0800 [CELEBORN-2112] Introduce PausePushDataStatus and PausePushDataAndReplicateStatus metric to record status of pause push data ### What changes were proposed in this pull request? Add `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric. ### Why are the changes needed? Introduce `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric to record status of pause push data. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test. [Grafana](https://xy2953396112.grafana.net/public-dashboards/21af8e2844234c438e74c741211f0032) Closes #3426 from xy2953396112/CELEBORN-2112. Authored-by: xxx <[email protected]> Signed-off-by: SteNicholas <[email protected]> (cherry picked from commit 661a096b778c2e53f6a3385363881cf0c7e37c78) Signed-off-by: SteNicholas <[email protected]> --- assets/grafana/celeborn-dashboard.json | 176 +++++++++++++++++++++ docs/monitoring.md | 2 + .../deploy/worker/memory/MemoryManager.java | 8 + .../celeborn/service/deploy/worker/Worker.scala | 6 + .../service/deploy/worker/WorkerSource.scala | 4 +- 5 files changed, 195 insertions(+), 1 deletion(-) diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index 5480e58ec..f5603e9a2 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -2855,6 +2855,182 @@ "title": "metrics_PausePushDataAndReplicateTime_Value", "type": "timeseries" }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 168 + }, + "id": 260, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_PausePushDataStatus_Value{instance=~\"${instance}\"}", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_PausePushDataStatus_Value", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 168 + }, + "id": 261, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "metrics_PausePushDataAndReplicateStatus_Value{instance=~\"${instance}\"}", + "legendFormat": "${baseLegend}", + "range": true, + "refId": "A" + } + ], + "title": "metrics_PausePushDataAndReplicateStatus_Value", + "type": "timeseries" + }, { "datasource": { "type": "prometheus", diff --git a/docs/monitoring.md b/docs/monitoring.md index 7a3d5ef5f..abf987c93 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -213,8 +213,10 @@ These metrics are exposed by Celeborn worker. | ReplicaRegionStartTime | ReplicaRegionStart means handle RegionStart of replica partition location. | | PrimaryRegionFinishTime | PrimaryRegionFinish means handle RegionFinish of primary partition location. | | ReplicaRegionFinishTime | ReplicaRegionFinish means handle RegionFinish of replica partition location. | + | PausePushDataStatus | The status for a worker to stop receiving pushData from clients because of back pressure. | | PausePushDataTime | The time for a worker to stop receiving pushData from clients because of back pressure. | | PausePushDataAndReplicateTime | The time for a worker to stop receiving pushData from clients and other workers because of back pressure. | + | PausePushDataAndReplicateStatus | The status for a worker to stop receiving pushData from clients because of back pressure. | | PausePushData | The count for a worker to stop receiving pushData from clients because of back pressure. | | PausePushDataAndReplicate | The count for a worker to stop receiving pushData from clients and other workers because of back pressure. | | PartitionFileSizeBytes | The size of partition files committed in current worker. | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index 5b11c58ab..da81c92a3 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -518,6 +518,14 @@ public class MemoryManager { return pausePushDataTime; } + public int getPushPausedStatus() { + return currentServingState() == ServingState.PUSH_PAUSED ? 1 : 0; + } + + public int getPushAndReplicatePausedStatus() { + return currentServingState() == ServingState.PUSH_AND_REPLICATE_PAUSED ? 1 : 0; + } + public long getPausePushDataAndReplicateTime() { return pausePushDataAndReplicateTime; } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 79928df44..d060f9578 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -428,6 +428,12 @@ private[celeborn] class Worker( workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_TIME) { () => memoryManager.getPausePushDataAndReplicateTime } + workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_STATUS) { () => + memoryManager.getPushPausedStatus + } + workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_AND_REPLICATE_STATUS) { () => + memoryManager.getPushAndReplicatePausedStatus + } workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_COUNT) { () => memoryManager.getPausePushDataCounter } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 4c9715c35..1b94406c8 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -194,9 +194,11 @@ object WorkerSource { // pause push data val PAUSE_PUSH_DATA_TIME = "PausePushDataTime" - val PAUSE_PUSH_DATA_AND_REPLICATE_TIME = "PausePushDataAndReplicateTime" val PAUSE_PUSH_DATA_COUNT = "PausePushData" + val PAUSE_PUSH_DATA_STATUS = "PausePushDataStatus" + val PAUSE_PUSH_DATA_AND_REPLICATE_TIME = "PausePushDataAndReplicateTime" val PAUSE_PUSH_DATA_AND_REPLICATE_COUNT = "PausePushDataAndReplicate" + val PAUSE_PUSH_DATA_AND_REPLICATE_STATUS = "PausePushDataAndReplicateStatus" // flush val TAKE_BUFFER_TIME = "TakeBufferTime"
