This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5be6557aa56 [improve](streaming-job) add per-job lag metric to
streaming insert jobs (#63194)
5be6557aa56 is described below
commit 5be6557aa5651d83a8163b4d83822fdc73fb3395
Author: wudi <[email protected]>
AuthorDate: Fri May 15 09:53:06 2026 +0800
[improve](streaming-job) add per-job lag metric to streaming insert jobs
(#63194)
### What problem does this PR solve?
Related PR: #62224 (per-job metrics), #62269 (Lag column)
Problem Summary:
#62224 introduced per-job metrics (`streaming_job_per_job_scanned_rows`,
`_load_bytes`, `_filtered_rows`, `_succeed_task_count`,
`_failed_task_count`) for streaming insert jobs, exposed via `/metrics`
with `job_id`/`job_name` labels for Prometheus.
#62269 later added a `Lag` column to `SHOW JOBS` / `jobs()` TVF that
reports end-to-end CDC delay in seconds, but the value was only exposed
through SQL — there was no corresponding Prometheus metric, so
dashboards/alerting on lag was not possible.
This PR adds `streaming_job_per_job_lag` (unit: `SECONDS`) to the
existing per-job metric set.
---
.../insert/streaming/StreamingInsertJob.java | 18 ++++++++++++++++++
.../main/java/org/apache/doris/metric/MetricRepo.java | 14 ++++++++++++++
.../cdc/test_streaming_mysql_job_metrics.groovy | 13 +++++++++++--
3 files changed, 43 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index e86e7fcda64..7e6c2c3830b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -885,6 +885,24 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
super.onReplayCreate();
}
+ public String getLag() {
+ return offsetProvider != null ? offsetProvider.getLag() : "";
+ }
+
+ // Numeric lag for metrics. Returns -1 when lag is not applicable (S3,
snapshot phase)
+ // or unparseable, so dashboards can filter N/A jobs via lag >= 0.
+ public long getLagSeconds() {
+ String lagStr = getLag();
+ if (lagStr == null || lagStr.isEmpty()) {
+ return -1L;
+ }
+ try {
+ return Long.parseLong(lagStr);
+ } catch (NumberFormatException e) {
+ return -1L;
+ }
+ }
+
/**
* Because the offset statistics of the streamingInsertJob are all stored
in txn,
* only some fields are replayed here.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 83168087926..9b3e37d8f2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -91,6 +91,7 @@ public final class MetricRepo {
public static final String STREAMING_JOB_PER_JOB_FILTERED_ROWS =
"streaming_job_per_job_filtered_rows";
public static final String STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT =
"streaming_job_per_job_succeed_task_count";
public static final String STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT =
"streaming_job_per_job_failed_task_count";
+ public static final String STREAMING_JOB_PER_JOB_LAG =
"streaming_job_per_job_lag";
public static final String CLOUD_TAG = "cloud";
public static LongCounterMetric COUNTER_REQUEST_ALL;
@@ -1246,6 +1247,7 @@ public final class MetricRepo {
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_FILTERED_ROWS);
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT);
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT);
+ DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_LAG);
try {
List<org.apache.doris.job.base.AbstractJob> jobs =
@@ -1328,6 +1330,18 @@ public final class MetricRepo {
failedTaskCount.addLabel(new MetricLabel("job_id", jobId))
.addLabel(new MetricLabel("job_name", jobName));
DORIS_METRIC_REGISTER.addMetrics(failedTaskCount);
+
+ GaugeMetric<Long> lag = new GaugeMetric<Long>(
+ STREAMING_JOB_PER_JOB_LAG, MetricUnit.SECONDS,
+ "per job lag in seconds of streaming job, -1 means
N/A") {
+ @Override
+ public Long getValue() {
+ return sJob.getLagSeconds();
+ }
+ };
+ lag.addLabel(new MetricLabel("job_id", jobId))
+ .addLabel(new MetricLabel("job_name", jobName));
+ DORIS_METRIC_REGISTER.addMetrics(lag);
}
} catch (Throwable t) {
LOG.warn("failed to update streaming job per-job metrics", t);
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
index e01e1d1908e..fc317a3963a 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
@@ -200,12 +200,21 @@ suite("test_streaming_mysql_job_metrics",
metricCount++
}
+ def perJobLag = result.find {
+ it.tags?.metric ==
"doris_fe_streaming_job_per_job_lag" &&
+ it.tags?.job_name == "${jobName}"
+ }
+ if (perJobLag != null) {
+ log.info("per-job lag: ${perJobLag}".toString())
+ metricCount++
+ }
+
}
}
- // 9 streaming_job_* counters + 1 doris_fe_job RUNNING gauge + 5
per-job metrics
- if (metricCount >= 15) {
+ // 9 streaming_job_* counters + 1 doris_fe_job RUNNING gauge + 6
per-job metrics
+ if (metricCount >= 16) {
break
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]