This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e168fd549 [CELEBORN-2111] Introduce metrics to flush time for
different storage types
e168fd549 is described below
commit e168fd54906b23011a9ada8d27b564f25c6c9920
Author: xxx <[email protected]>
AuthorDate: Wed Oct 29 09:59:40 2025 +0800
[CELEBORN-2111] Introduce metrics to flush time for different storage types
### What changes were proposed in this pull request?
Introduce metrics to flush time for different storage types.
### Why are the changes needed?
Measuring flush time for different storage types, including `LocalFlusher`,
`HdfsFlusher`, `S3Flusher`, `OssFlusher`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test &
[Grafana](https://xy2953396112.grafana.net/public-dashboards/f395cb4f3c924f0d9a0a48c9a95912b2).
Closes #3425 from xy2953396112/flushTime.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 552 ++++++++++++++++++++-
docs/migration.md | 3 +
docs/monitoring.md | 5 +-
.../service/deploy/worker/WorkerSource.scala | 10 +-
.../service/deploy/worker/storage/Flusher.scala | 12 +-
5 files changed, 570 insertions(+), 12 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index d00f3e907..0dba8a844 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -6981,24 +6981,22 @@
"showLegend": true
},
"tooltip": {
- "hideZeros": false,
"mode": "single",
"sort": "none"
}
},
- "pluginVersion": "12.1.0-91295",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_FlushDataTime_Mean{instance=~\"${instance}\"}",
+ "expr": "metrics_FlushLocalDataTime_Mean{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
- "title": "metrics_FlushDataTime_Mean",
+ "title": "metrics_FlushLocalDataTime_Mean",
"type": "timeseries"
},
{
@@ -7077,24 +7075,562 @@
"showLegend": true
},
"tooltip": {
- "hideZeros": false,
"mode": "single",
"sort": "none"
}
},
- "pluginVersion": "12.1.0-91295",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_FlushDataTime_Max{instance=~\"${instance}\"}",
+ "expr": "metrics_FlushLocalDataTime_Max{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FlushLocalDataTime_Max",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": 0
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 0,
+ "y": 14
+ },
+ "id": 264,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_FlushHdfsDataTime_Mean{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FlushHdfsDataTime_Mean",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": 0
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 12,
+ "y": 14
+ },
+ "id": 265,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_FlushHdfsDataTime_Max{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FlushHdfsDataTime_Max",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": 0
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 0,
+ "y": 14
+ },
+ "id": 266,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_FlushS3DataTime_Mean{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FlushS3DataTime_Mean",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": 0
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 12,
+ "y": 14
+ },
+ "id": 267,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_FlushS3DataTime_Max{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FlushS3DataTime_Max",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": 0
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 0,
+ "y": 14
+ },
+ "id": 268,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_FlushOssDataTime_Mean{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FlushOssDataTime_Mean",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "barWidthFactor": 0.6,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": 0
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 12,
+ "y": 14
+ },
+ "id": 269,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_FlushOssDataTime_Max{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
- "title": "metrics_FlushDataTime_Max",
+ "title": "metrics_FlushOssDataTime_Max",
"type": "timeseries"
},
{
diff --git a/docs/migration.md b/docs/migration.md
index df953e39a..81babe0ab 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -24,8 +24,11 @@ license: |
# Upgrading from 0.6 to 0.7
- Since 0.7.0, Celeborn removed `ReleaseSlots`.
+
- Since 0.7.0, Celeborn removed `WorkerRemove`.
+- Since 0.7.0, Celeborn worker metrics `FlushDataTime` is renamed as
`FlushLocalDataTime`.
+
# Upgrading from 0.5 to 0.6
- Since 0.6.0, Celeborn deprecate
`celeborn.client.spark.fetch.throwsFetchFailure`. Please use
`celeborn.client.spark.stageRerun.enabled` instead.
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 19a25e769..18bb29c8f 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -223,7 +223,10 @@ These metrics are exposed by Celeborn worker.
| PausePushDataAndReplicate | The count for a worker to stop
receiving pushData from clients and other workers because of back pressure.
|
| PartitionFileSizeBytes | The size of partition files
committed in current worker.
|
| TakeBufferTime | The time for a worker to take
out a buffer from a disk flusher.
|
- | FlushDataTime | The time for a worker to write
a buffer which is 256KB by default to storage.
|
+ | FlushLocalDataTime | The time for a worker to write
a buffer to local storage.
|
+ | FlushHdfsDataTime | The time for a worker to write
a buffer to hdfs storage.
|
+ | FlushOssDataTime | The time for a worker to write
a buffer to oss storage.
|
+ | FlushS3DataTime | The time for a worker to write
a buffer to s3 storage.
|
| CommitFilesTime | The time for a worker to flush
buffers and close files related to specified shuffle.
|
| CommitFilesFailCount | The count of commit files
request failed in current worker.
|
| SlotsAllocated | Slots allocated in last hour.
|
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index ece183ee2..f3f5446cc 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -75,7 +75,10 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, Role.WORKER)
// add timers
addTimer(COMMIT_FILES_TIME)
addTimer(RESERVE_SLOTS_TIME)
- addTimer(FLUSH_DATA_TIME)
+ addTimer(FLUSH_LOCAL_DATA_TIME)
+ addTimer(FLUSH_HDFS_DATA_TIME)
+ addTimer(FLUSH_OSS_DATA_TIME)
+ addTimer(FLUSH_S3_DATA_TIME)
addTimer(PRIMARY_PUSH_DATA_TIME)
addTimer(REPLICA_PUSH_DATA_TIME)
@@ -202,7 +205,10 @@ object WorkerSource {
// flush
val TAKE_BUFFER_TIME = "TakeBufferTime"
- val FLUSH_DATA_TIME = "FlushDataTime"
+ val FLUSH_LOCAL_DATA_TIME = "FlushLocalDataTime"
+ val FLUSH_HDFS_DATA_TIME = "FlushHdfsDataTime"
+ val FLUSH_OSS_DATA_TIME = "FlushOssDataTime"
+ val FLUSH_S3_DATA_TIME = "FlushS3DataTime"
val COMMIT_FILES_TIME = "CommitFilesTime"
val COMMIT_FILES_FAIL_COUNT = "CommitFilesFailCount"
val FLUSH_WORKING_QUEUE_SIZE = "FlushWorkingQueueSize"
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 2a4134e79..40daca1df 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -71,7 +71,7 @@ abstract private[worker] class Flusher(
while (!stopFlag.get()) {
val task = workingQueues(index).take()
val key = s"Flusher-$this-${Random.nextInt()}"
- workerSource.sample(WorkerSource.FLUSH_DATA_TIME, key) {
+ workerSource.sample(getFlushTimeMetric(), key) {
if (!task.notifier.hasException) {
try {
val flushBeginTime = System.nanoTime()
@@ -137,6 +137,8 @@ abstract private[worker] class Flusher(
}
def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit
+
+ def getFlushTimeMetric(): String
}
private[worker] class LocalFlusher(
@@ -180,6 +182,8 @@ private[worker] class LocalFlusher(
}
override def toString: String = s"LocalFlusher@$flusherId-$mountPoint"
+
+ override def getFlushTimeMetric(): String =
WorkerSource.FLUSH_LOCAL_DATA_TIME
}
final private[worker] class HdfsFlusher(
@@ -203,6 +207,8 @@ final private[worker] class HdfsFlusher(
}
override def toString: String = s"HdfsFlusher@$flusherId"
+
+ override def getFlushTimeMetric(): String = WorkerSource.FLUSH_HDFS_DATA_TIME
}
final private[worker] class S3Flusher(
@@ -226,6 +232,8 @@ final private[worker] class S3Flusher(
}
override def toString: String = s"s3Flusher@$flusherId"
+
+ override def getFlushTimeMetric(): String = WorkerSource.FLUSH_S3_DATA_TIME
}
final private[worker] class OssFlusher(
@@ -249,4 +257,6 @@ final private[worker] class OssFlusher(
}
override def toString: String = s"ossFlusher@$flusherId"
+
+ override def getFlushTimeMetric(): String = WorkerSource.FLUSH_OSS_DATA_TIME
}