This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new dc4bc1bddce branch-4.0: [Improve](StreamingJob) add more metrics to
observe the streaming job #60493 (#60571)
dc4bc1bddce is described below
commit dc4bc1bddcea3fff46bf1e44c49db7169ca8ab6b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Feb 7 22:06:13 2026 +0800
branch-4.0: [Improve](StreamingJob) add more metrics to observe the
streaming job #60493 (#60571)
Cherry-picked from #60493
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingInsertJob.java | 27 +++-
.../java/org/apache/doris/metric/MetricRepo.java | 86 ++++++++++
.../cdc/test_streaming_mysql_job_metrics.groovy | 180 +++++++++++++++++++++
.../test_routin_load_abnormal_job_monitor.groovy | 15 +-
4 files changed, 305 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 43fe652ee06..c820b8d532a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -54,6 +54,7 @@ import
org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
@@ -516,6 +517,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
protected void fetchMeta() throws JobException {
+ long start = System.currentTimeMillis();
try {
if (tvfType != null) {
if (originTvfProps == null) {
@@ -537,7 +539,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
// If fetching meta fails, the job is paused
// and auto resume will automatically wake it up.
this.updateJobStatus(JobStatus.PAUSED);
+
+
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
}
+ } finally {
+ long end = System.currentTimeMillis();
+ MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end -
start);
+ MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
}
}
@@ -584,6 +592,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
this.failureReason = new FailureReason(task.getErrMsg());
+ MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
} finally {
writeUnlock();
}
@@ -594,6 +603,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
try {
resetFailureInfo(null);
succeedTaskCount.incrementAndGet();
+ //update metric
+ MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
+
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(task.getFinishTimeMs()
- task.getStartTimeMs());
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
AbstractStreamingTask nextTask = createStreamingTask();
this.runningStreamTask = nextTask;
@@ -613,6 +626,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() +
attachment.getNumFiles());
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() +
attachment.getFileBytes());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+
+ //update metric
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
}
private void
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
@@ -624,6 +641,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.jobStatistic.setFileNumber(attachment.getNumFiles());
this.jobStatistic.setFileSize(attachment.getFileBytes());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+
+ //update metric
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
}
private void updateJobStatisticAndOffset(CommitOffsetRequest
offsetRequest) {
@@ -645,6 +666,11 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
.setFilteredRows(this.nonTxnJobStatistic.getFilteredRows() +
offsetRequest.getFilteredRows());
this.nonTxnJobStatistic.setLoadBytes(this.nonTxnJobStatistic.getLoadBytes() +
offsetRequest.getLoadBytes());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
+
+ //update metric
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
}
@Override
@@ -658,7 +684,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
super.onReplayCreate();
}
-
/**
* Because the offset statistics of the streamingInsertJob are all stored
in txn,
* only some fields are replayed here.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index b7bbfea437b..bfebc386e77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -28,7 +28,9 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.NetUtils;
+import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadManager;
@@ -156,6 +158,17 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT;
+ // Streaming job
+ public static LongCounterMetric COUNTER_STREAMING_JOB_GET_META_LANTENCY;
+ public static LongCounterMetric COUNTER_STREAMING_JOB_GET_META_COUNT;
+ public static LongCounterMetric COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT;
+ public static LongCounterMetric COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME;
+ public static LongCounterMetric COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT;
+ public static LongCounterMetric COUNTER_STREAMING_JOB_TASK_FAILED_COUNT;
+ public static LongCounterMetric COUNTER_STREAMING_JOB_TOTAL_ROWS;
+ public static LongCounterMetric COUNTER_STREAMING_JOB_FILTER_ROWS;
+ public static LongCounterMetric COUNTER_STREAMING_JOB_LOAD_BYTES;
+
public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;
public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
@@ -300,6 +313,7 @@ public final class MetricRepo {
}
initRoutineLoadJobMetrics();
+ initStreamingJobMetrics();
// running alter job
Alter alter = Env.getCurrentEnv().getAlterInstance();
@@ -637,6 +651,35 @@ public final class MetricRepo {
MetricUnit.NOUNIT, "task execute count of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT);
+ // streaming job metrics
+ COUNTER_STREAMING_JOB_GET_META_LANTENCY = new
LongCounterMetric("streaming_job_get_meta_latency",
+ MetricUnit.MILLISECONDS, "get meta lantency of streaming job");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_GET_META_LANTENCY);
+ COUNTER_STREAMING_JOB_GET_META_COUNT = new
LongCounterMetric("streaming_job_get_meta_count",
+ MetricUnit.NOUNIT, "get meta count of streaming job");
+ DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_GET_META_COUNT);
+ COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT = new
LongCounterMetric("streaming_job_get_meta_fail_count",
+ MetricUnit.NOUNIT, "get meta fail count of streaming job");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT);
+ COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME = new
LongCounterMetric("streaming_job_task_execute_time",
+ MetricUnit.MILLISECONDS, "task execute time of streaming job");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME);
+ COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT = new
LongCounterMetric("streaming_job_task_execute_count",
+ MetricUnit.NOUNIT, "task execute count of streaming job");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT);
+ COUNTER_STREAMING_JOB_TASK_FAILED_COUNT = new
LongCounterMetric("streaming_job_task_failed_count",
+ MetricUnit.NOUNIT, "task failed count of streaming job");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_TASK_FAILED_COUNT);
+ COUNTER_STREAMING_JOB_TOTAL_ROWS = new
LongCounterMetric("streaming_job_total_rows", MetricUnit.ROWS,
+ "total rows of streaming job");
+ DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_TOTAL_ROWS);
+ COUNTER_STREAMING_JOB_FILTER_ROWS = new
LongCounterMetric("streaming_job_filter_rows", MetricUnit.ROWS,
+ "filter rows of streaming job");
+ DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_FILTER_ROWS);
+ COUNTER_STREAMING_JOB_LOAD_BYTES = new
LongCounterMetric("streaming_job_load_bytes", MetricUnit.BYTES,
+ "load bytes of streaming job");
+ DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_LOAD_BYTES);
+
COUNTER_HIT_SQL_BLOCK_RULE = new
LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
"total hit sql block rule query");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_HIT_SQL_BLOCK_RULE);
@@ -1059,6 +1102,49 @@ public final class MetricRepo {
DORIS_METRIC_REGISTER.addMetrics(gauge);
}
+ private static void initStreamingJobMetrics() {
+ // streaming insert jobs
+ for (JobStatus jobStatus : JobStatus.values()) {
+ if (jobStatus == JobStatus.PAUSED) {
+ addStreamingJobStateGaugeMetric(jobStatus, "USER_PAUSED",
+ job -> job.getFailureReason() != null
+ && job.getFailureReason().getCode() ==
InternalErrorCode.MANUAL_PAUSE_ERR);
+ addStreamingJobStateGaugeMetric(jobStatus, "ABNORMAL_PAUSED",
+ job -> job.getFailureReason() != null
+ && job.getFailureReason().getCode() !=
InternalErrorCode.MANUAL_PAUSE_ERR);
+ }
+ addStreamingJobStateGaugeMetric(jobStatus, jobStatus.name(), job
-> true);
+ }
+ }
+
+ private static void addStreamingJobStateGaugeMetric(
+ JobStatus jobStatus, String stateLabel,
Predicate<StreamingInsertJob> filter) {
+
+ GaugeMetric<Long> gauge = new GaugeMetric<Long>(
+ "job", MetricUnit.NOUNIT, "streaming job statistics") {
+ @Override
+ public Long getValue() {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return 0L;
+ }
+ List<org.apache.doris.job.base.AbstractJob> jobs =
+
Env.getCurrentEnv().getJobManager().queryJobs(org.apache.doris.job.common.JobType.INSERT);
+
+ return jobs.stream()
+ .filter(job -> job instanceof StreamingInsertJob)
+ .map(job -> (StreamingInsertJob) job)
+ .filter(job -> job.getJobStatus() == jobStatus)
+ .filter(filter)
+ .count();
+ }
+ };
+
+ gauge.addLabel(new MetricLabel("job", "load"))
+ .addLabel(new MetricLabel("type", "STREAMING_JOB"))
+ .addLabel(new MetricLabel("state", stateLabel));
+ DORIS_METRIC_REGISTER.addMetrics(gauge);
+ }
+
private static void initSystemMetrics() {
// TCP retransSegs
GaugeMetric<Long> tcpRetransSegs = (GaugeMetric<Long>) new
GaugeMetric<Long>(
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
new file mode 100644
index 00000000000..644e63ce5f0
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+import groovy.json.JsonSlurper
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_metrics",
+ "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+
+ def jobName = "test_streaming_mysql_job_metrics"
+ def currentDb = (sql "select database()")[0][0]
+ def mysqlDb = "test_cdc_db"
+ def mysqlTable = "user_info_metrics"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}"""
+ sql """CREATE TABLE ${mysqlDb}.${mysqlTable} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES
('Alice', 10)"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES
('Bob', 20)"""
+ }
+
+ // create streaming job: FROM MYSQL ... TO DATABASE currentDb
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${mysqlTable}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until({
+ def jobInfo = sql """
+ select SucceedTaskCount, Status
+ from jobs("type"="insert")
+ where Name = '${jobName}' and
ExecuteType='STREAMING'
+ """
+ log.info("metrics job status: " + jobInfo)
+ jobInfo.size() == 1 &&
+ Integer.parseInt(jobInfo[0][0] as String) >= 1
&&
+ (jobInfo[0][1] as String) == "RUNNING"
+ })
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("metrics show job: " + showjob)
+ log.info("metrics show task: " + showtask)
+ throw ex
+ }
+
+ int count = 0
+ int metricCount = 0
+ while (true) {
+ metricCount = 0
+ httpTest {
+ endpoint context.config.feHttpAddress
+ uri "/metrics?type=json"
+ op "get"
+ check { code, body ->
+ logger.debug("code:${code} body:${body}")
+
+ if
(body.contains("doris_fe_streaming_job_get_meta_latency")) {
+ log.info("contain
doris_fe_streaming_job_get_meta_latency")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_streaming_job_get_meta_count")) {
+ log.info("contain
doris_fe_streaming_job_get_meta_count")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_streaming_job_get_meta_fail_count")) {
+ log.info("contain
doris_fe_streaming_job_get_meta_fail_count")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_streaming_job_task_execute_time")) {
+ log.info("contain
doris_fe_streaming_job_task_execute_time")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_streaming_job_task_execute_count")) {
+ log.info("contain
doris_fe_streaming_job_task_execute_count")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_streaming_job_task_failed_count")) {
+ log.info("contain
doris_fe_streaming_job_task_failed_count")
+ metricCount++
+ }
+ if (body.contains("doris_fe_streaming_job_total_rows")) {
+ log.info("contain doris_fe_streaming_job_total_rows")
+ metricCount++
+ }
+ if (body.contains("doris_fe_streaming_job_filter_rows")) {
+ log.info("contain doris_fe_streaming_job_filter_rows")
+ metricCount++
+ }
+ if (body.contains("doris_fe_streaming_job_load_bytes")) {
+ log.info("contain doris_fe_streaming_job_load_bytes")
+ metricCount++
+ }
+
+ // check doris_fe_job gauge: STREAMING_JOB in RUNNING
state should be exactly 1
+ def jsonSlurper = new JsonSlurper()
+ def result = jsonSlurper.parseText(body)
+ def entry = result.find {
+ it.tags?.metric == "doris_fe_job" &&
+ it.tags?.job == "load" &&
+ it.tags?.type == "STREAMING_JOB" &&
+ it.tags?.state == "RUNNING"
+ }
+ def value = entry ? entry.value : null
+ log.info("streaming job RUNNING metric entry:
${entry}".toString())
+ log.info("streaming job RUNNING value:
${value}".toString())
+ if (value >= 1) {
+ metricCount++
+ }
+
+ }
+ }
+
+ // 9 streaming_job_* counters + 1 doris_fe_job RUNNING gauge
+ if (metricCount >= 10) {
+ break
+ }
+
+ count++
+ sleep(1000)
+ if (count > 60) {
+ // timeout, failed
+ assertEquals(1, 2)
+ }
+ }
+
+
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
+
diff --git
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
index ea466010929..a6cb574d62d 100644
---
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
@@ -201,7 +201,12 @@ suite("test_routine_load_abnormal_job_monitor","p0") {
def jsonSlurper = new JsonSlurper()
def result = jsonSlurper.parseText(body)
- def entry = result.find { it.tags?.metric ==
"doris_fe_job" && it.tags?.state == "ABNORMAL_PAUSED"}
+ def entry = result.find {
+ it.tags?.metric == "doris_fe_job" &&
+ it.tags?.job == "load" &&
+ it.tags?.type == "ROUTINE_LOAD" &&
+ it.tags?.state == "ABNORMAL_PAUSED"
+ }
def value = entry ? entry.value : null
log.info("Contains ABNORMAL_PAUSE: ${entry !=
null}".toString())
log.info("Value of ABNORMAL_PAUSE:
${value}".toString())
@@ -209,7 +214,13 @@ suite("test_routine_load_abnormal_job_monitor","p0") {
metricCount++
}
- entry = result.find { it.tags?.metric ==
"doris_fe_job" && it.tags?.state == "USER_PAUSED"}
+ entry = result.find {
+ it.tags?.metric == "doris_fe_job" &&
+ it.tags?.job == "load" &&
+ it.tags?.type == "ROUTINE_LOAD" &&
+ it.tags?.state == "USER_PAUSED"
+ }
+
value = entry ? entry.value : null
log.info("Contains USER_PAUSE: ${entry !=
null}".toString())
log.info("Value of USER_PAUSE: ${value}".toString())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]