This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 11c90d8e726a50477a55a0338c37a0d76ec55226 Author: SteNicholas <[email protected]> AuthorDate: Mon Oct 23 09:56:04 2023 +0800 [CELEBORN-916] Add new metric about active shuffle file count in worker ### What changes were proposed in this pull request? Adds new metric `ActiveShuffleFileCount` about active shuffle file count of Celeborn Worker. ### Why are the changes needed? `ActiveShuffleSize` metric report the active shuffle size of peer worker at present. Therefore, it's better to introduce `ActiveShuffleFileCount` to report the active shuffle file count of Celeborn Worker. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Internal tests. Closes #2009 from SteNicholas/CELEBORN-916. Authored-by: SteNicholas <[email protected]> Signed-off-by: mingji <[email protected]> --- METRICS.md | 2 + assets/grafana/celeborn-dashboard.json | 123 ++++++++++++++++++++- docs/monitoring.md | 8 +- .../celeborn/service/deploy/worker/Worker.scala | 3 + .../service/deploy/worker/WorkerSource.scala | 4 +- .../deploy/worker/storage/StorageManager.scala | 3 + 6 files changed, 138 insertions(+), 5 deletions(-) diff --git a/METRICS.md b/METRICS.md index 7b5081a9d..42a40fe29 100644 --- a/METRICS.md +++ b/METRICS.md @@ -95,6 +95,8 @@ Here is an example of grafana dashboard importing. | DiskBuffer | worker | Disk buffers are part of netty used memory, means data need to write to disk but haven't been written to disk. | | PausePushData | worker | PausePushData means the count of worker stopped receiving data from client. | | PausePushDataAndReplicate | worker | PausePushDataAndReplicate means the count of worker stopped receiving data from client and other workers. | +| ActiveShuffleSize | worker | The active shuffle size of a worker including master replica and slave replica. | +| ActiveShuffleFileCount | worker | The active shuffle file count of a worker including master replica and slave replica. | | OutstandingFetchCount | worker | The count of outstanding fetch request received in peer worker. | | OutstandingRpcCount | worker | The count of outstanding rpc request received in peer worker. | | OutstandingPushCount | worker | The count of outstanding push request received in peer worker. | diff --git a/assets/grafana/celeborn-dashboard.json b/assets/grafana/celeborn-dashboard.json index cb676a140..aa1f7fd4a 100644 --- a/assets/grafana/celeborn-dashboard.json +++ b/assets/grafana/celeborn-dashboard.json @@ -542,7 +542,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "description": "The active shuffle size.", + "description": "The active shuffle size of workers.", "fieldConfig": { "defaults": { "color": { @@ -634,7 +634,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "description": "The active shuffle partition count.", + "description": "The active shuffle file count of workers.", "fieldConfig": { "defaults": { "color": { @@ -1722,6 +1722,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "description": "The active shuffle size of a worker including master replica and slave replica.", "fieldConfig": { "defaults": { "color": { @@ -1834,6 +1835,124 @@ ], "title": "metrics_ActiveShuffleSize_Value", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "The active shuffle file count of a worker including master replica and slave replica.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "metrics_ActiveShuffleFileCount_Value{instance=\"core-1-1:9096\", job=\"RSS\", role=\"Worker\"}" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 30 + }, + "id": 181, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "metrics_ActiveShuffleFileCount_Value", + "instant": false, + "range": true, + "refId": "A" + } + ], + "title": "metrics_ActiveShuffleFileCount_Value", + "type": "timeseries" } ], "title": "Worker", diff --git a/docs/monitoring.md b/docs/monitoring.md index 440b5e4de..228d47a3d 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -100,9 +100,9 @@ These metrics are exposed by Celeborn master. - PartitionSize - The size of estimated shuffle partition. - PartitionWritten - - The active shuffle size. + - The active shuffle size of workers. - PartitionFileCount - - The active shuffle partition count. + - The active shuffle file count of workers. - OfferSlotsTime - The time for masters to handle `RequestSlots` request when registering shuffle. @@ -194,6 +194,10 @@ These metrics are exposed by Celeborn worker. - PotentialConsumeSpeed - UserProduceSpeed - WorkerConsumeSpeed + - ActiveShuffleSize + - The active shuffle size of a worker including master replica and slave replica. + - ActiveShuffleFileCount + - The active shuffle file count of a worker including master replica and slave replica. - OutstandingFetchCount - The count of outstanding fetch request. - OutstandingRpcCount diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 90e5e58ff..1cc85db3d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -304,6 +304,9 @@ private[celeborn] class Worker( workerSource.addGauge(WorkerSource.ACTIVE_SHUFFLE_SIZE) { () => storageManager.getActiveShuffleSize() } + workerSource.addGauge(WorkerSource.ACTIVE_SHUFFLE_FILE_COUNT) { () => + storageManager.getActiveShuffleFileCount() + } workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_TIME) { () => memoryManager.getPausePushDataTime } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 20a138f9a..df07970ab 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -100,6 +100,7 @@ object WorkerSource { // slots val SLOTS_ALLOCATED = "SlotsAllocated" + // connection val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount" // memory @@ -124,11 +125,12 @@ object WorkerSource { val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes" val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes" - // Congestion control + // congestion control val POTENTIAL_CONSUME_SPEED = "PotentialConsumeSpeed" val USER_PRODUCE_SPEED = "UserProduceSpeed" val WORKER_CONSUME_SPEED = "WorkerConsumeSpeed" // active shuffle size val ACTIVE_SHUFFLE_SIZE = "ActiveShuffleSize" + val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount" } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index c836e98e6..9f370a6bc 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -845,6 +845,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs fileInfos.values().asScala.map(_.values().asScala.map(_.getBytesFlushed).sum).sum } + def getActiveShuffleFileCount(): Long = { + fileInfos.asScala.values.map(_.size()).sum + } } object StorageManager {
