This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 957a4de9a88 branch-3.0: [improve](routine load) add more metrics to
observe the routine load job #48209 (#48764)
957a4de9a88 is described below
commit 957a4de9a8889f75d904c1a74258b2d00a073a36
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 10 20:54:46 2025 +0800
branch-3.0: [improve](routine load) add more metrics to observe the routine
load job #48209 (#48764)
Cherry-picked from #48209
Co-authored-by: hui lai <[email protected]>
---
be/src/runtime/routine_load/data_consumer.cpp | 6 +
be/src/util/doris_metrics.cpp | 10 +
be/src/util/doris_metrics.h | 5 +
.../apache/doris/datasource/kafka/KafkaUtil.java | 60 +++--
.../doris/load/routineload/RoutineLoadJob.java | 8 +-
.../java/org/apache/doris/metric/MetricRepo.java | 20 ++
.../doris/load/routineload/RoutineLoadJobTest.java | 4 +-
.../load_p0/routine_load/data/test_metrics.csv | 20 ++
.../routine_load/test_routine_load_metrics.groovy | 254 +++++++++++++++++++++
9 files changed, 357 insertions(+), 30 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index b6272a92056..7566d06914a 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -38,6 +38,7 @@
#include "util/blocking_queue.hpp"
#include "util/debug_points.h"
#include "util/defer_op.h"
+#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
#include "util/string_util.h"
#include "util/uid_util.h"
@@ -219,6 +220,9 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
consumer_watch.start();
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /*
timeout, ms */));
consumer_watch.stop();
+ DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
+ DorisMetrics::instance()->routine_load_get_msg_latency->increment(
+ consumer_watch.elapsed_time() / 1000 / 1000);
DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
done = true;
std::stringstream ss;
@@ -234,6 +238,7 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
if (_consuming_partition_ids.count(msg->partition()) <= 0) {
_consuming_partition_ids.insert(msg->partition());
}
+
DorisMetrics::instance()->routine_load_consume_bytes->increment(msg->len());
if (msg->len() == 0) {
// ignore msg with length 0.
// put empty msg into queue will cause the load process
shutting down.
@@ -246,6 +251,7 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
msg.release(); // release the ownership, msg will be deleted
after being processed
}
++received_rows;
+ DorisMetrics::instance()->routine_load_consume_rows->increment(1);
break;
case RdKafka::ERR__TIMED_OUT:
// leave the status as OK, because this may happened
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 39f246d98d3..48b88cd5727 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -122,6 +122,11 @@
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_rows_total, MetricUnit::ROWS, "
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_get_msg_latency,
MetricUnit::MILLISECONDS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_get_msg_count,
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_consume_rows,
MetricUnit::ROWS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(routine_load_consume_bytes,
MetricUnit::BYTES);
+
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us,
MetricUnit::MICROSECONDS);
@@ -255,6 +260,11 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
stream_receive_bytes_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_load_rows_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
routine_load_get_msg_latency);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
routine_load_get_msg_count);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
routine_load_consume_bytes);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
routine_load_consume_rows);
+
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
memtable_flush_duration_us);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 26bad02fdd2..6fbc24d6922 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -113,6 +113,11 @@ public:
IntCounter* load_rows = nullptr;
IntCounter* load_bytes = nullptr;
+ IntCounter* routine_load_get_msg_latency = nullptr;
+ IntCounter* routine_load_get_msg_count = nullptr;
+ IntCounter* routine_load_consume_bytes = nullptr;
+ IntCounter* routine_load_consume_rows = nullptr;
+
IntCounter* memtable_flush_total = nullptr;
IntCounter* memtable_flush_duration_us = nullptr;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index c0c932bb8ae..293f0875ac0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
@@ -221,37 +222,46 @@ public class KafkaUtil {
private static InternalService.PProxyResult
getInfoRequest(InternalService.PProxyRequest request, int timeout)
throws LoadException {
+ long startTime = System.currentTimeMillis();
int retryTimes = 0;
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
- while (retryTimes < 3) {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (backendIds.isEmpty()) {
- throw new LoadException("Failed to get info. No alive
backends");
- }
- Collections.shuffle(backendIds);
- Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+ try {
+ while (retryTimes < 3) {
+ List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
+ if (backendIds.isEmpty()) {
+
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
+ throw new LoadException("Failed to get info. No alive
backends");
+ }
+ Collections.shuffle(backendIds);
+ Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
- try {
- future = BackendServiceProxy.getInstance().getInfo(address,
request);
- result = future.get(Config.max_get_kafka_meta_timeout_second,
TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.warn("failed to get info request to " + address + " err "
+ e.getMessage());
- retryTimes++;
- continue;
- }
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- LOG.warn("failed to get info request to "
- + address + " err " +
result.getStatus().getErrorMsgsList());
- retryTimes++;
- } else {
- return result;
+ try {
+ future =
BackendServiceProxy.getInstance().getInfo(address, request);
+ result =
future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.warn("failed to get info request to " + address + "
err " + e.getMessage());
+ retryTimes++;
+ continue;
+ }
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ LOG.warn("failed to get info request to "
+ + address + " err " +
result.getStatus().getErrorMsgsList());
+ retryTimes++;
+ } else {
+ return result;
+ }
}
- }
- throw new LoadException("Failed to get info");
+ MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
+ throw new LoadException("Failed to get info");
+ } finally {
+ long endTime = System.currentTimeMillis();
+ MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime
- startTime);
+ MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 24f12526652..67bebee04f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -861,7 +861,7 @@ public abstract class RoutineLoadJob
// if rate of error data is more than max_filter_ratio, pause job
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws
UserException {
updateNumOfData(attachment.getTotalRows(),
attachment.getFilteredRows(), attachment.getUnselectedRows(),
- attachment.getReceivedBytes(), false /* not replay */);
+ attachment.getReceivedBytes(),
attachment.getTaskExecutionTimeMs(), false /* not replay */);
}
protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
@@ -877,7 +877,7 @@ public abstract class RoutineLoadJob
}
private void updateNumOfData(long numOfTotalRows, long numOfErrorRows,
long unselectedRows, long receivedBytes,
- boolean isReplay) throws UserException {
+ long taskExecutionTime, boolean isReplay)
throws UserException {
this.jobStatistic.totalRows += numOfTotalRows;
this.jobStatistic.errorRows += numOfErrorRows;
this.jobStatistic.unselectedRows += unselectedRows;
@@ -888,6 +888,8 @@ public abstract class RoutineLoadJob
MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
MetricRepo.COUNTER_ROUTINE_LOAD_ERROR_ROWS.increase(numOfErrorRows);
MetricRepo.COUNTER_ROUTINE_LOAD_RECEIVED_BYTES.increase(receivedBytes);
+
MetricRepo.COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME.increase(taskExecutionTime);
+ MetricRepo.COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME.increase(1L);
}
// check error rate
@@ -957,7 +959,7 @@ public abstract class RoutineLoadJob
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
try {
updateNumOfData(attachment.getTotalRows(),
attachment.getFilteredRows(), attachment.getUnselectedRows(),
- attachment.getReceivedBytes(), true /* is replay */);
+ attachment.getReceivedBytes(),
attachment.getTaskExecutionTimeMs(), true /* is replay */);
} catch (UserException e) {
LOG.error("should not happen", e);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 80800bc0738..40930edd052 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -137,6 +137,11 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
+ public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_LANTENCY;
+ public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_COUNT;
+ public static LongCounterMetric COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT;
+ public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME;
+ public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT;
public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;
public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
@@ -535,6 +540,21 @@ public final class MetricRepo {
COUNTER_ROUTINE_LOAD_ERROR_ROWS = new
LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS,
"total error rows of routine load");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
+ COUNTER_ROUTINE_LOAD_GET_META_LANTENCY = new
LongCounterMetric("routine_load_get_meta_latency",
+ MetricUnit.MILLISECONDS, "get meta lantency of routine load");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_LANTENCY);
+ COUNTER_ROUTINE_LOAD_GET_META_COUNT = new
LongCounterMetric("routine_load_get_meta_count", MetricUnit.NOUNIT,
+ "get meta count of routine load");
+ DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_COUNT);
+ COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT = new
LongCounterMetric("routine_load_get_meta_fail_count",
+ MetricUnit.NOUNIT, "get meta fail count of routine load");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT);
+ COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME = new
LongCounterMetric("routine_load_task_execute_time",
+ MetricUnit.MILLISECONDS, "task execute time of routine load");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME);
+ COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT = new
LongCounterMetric("routine_load_task_execute_count",
+ MetricUnit.MILLISECONDS, "task execute count of routine load");
+
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT);
COUNTER_HIT_SQL_BLOCK_RULE = new
LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
"total hit sql block rule query");
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 2d9efd895cb..a4124acea62 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -301,7 +301,7 @@ public class RoutineLoadJobTest {
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 0);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 0);
- Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L,
1L, false);
+ Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L,
1L, 1L, false);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED,
Deencapsulation.getField(routineLoadJob, "state"));
@@ -316,7 +316,7 @@ public class RoutineLoadJobTest {
RoutineLoadStatistic jobStatistic =
Deencapsulation.getField(routineLoadJob, "jobStatistic");
Deencapsulation.setField(jobStatistic, "currentErrorRows", 1);
Deencapsulation.setField(jobStatistic, "currentTotalRows", 99);
- Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L,
1L, false);
+ Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L,
1L, 1L, false);
Assert.assertEquals(RoutineLoadJob.JobState.RUNNING,
Deencapsulation.getField(routineLoadJob, "state"));
Assert.assertEquals(new Long(0),
Deencapsulation.getField(jobStatistic, "currentErrorRows"));
diff --git a/regression-test/suites/load_p0/routine_load/data/test_metrics.csv
b/regression-test/suites/load_p0/routine_load/data/test_metrics.csv
new file mode 100644
index 00000000000..b58285ed575
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_metrics.csv
@@ -0,0 +1,20 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
18:39:10|2023-02-12|2023-01-27
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city":
"New York"}
+49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true,
"name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}
+66|2023-08-15|TRUE|-91|28378|609923317|4872185586197131212|1207709464099378591|\N|-1863683325.985123|-783792012.0|-708986976.0|2022-09-24
10:39:23|2022-09-24|2022-10-16
18:36:43|Y|z|AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6|{"book":
{"title": "The Great Gatsby", "author": "F. Scott Fitzgerald"}, "year": 1925}
+91|2023-08-27|TRUE|90|2465|702240964|6373830997821598984|305860046137409400|15991.356445|1599972327.386147|-165530947.0|\N|2023-04-26
19:31:10|2023-07-21|\N|2||B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK|{"fruit":
"apple", "color": "red", "qty": 5, "price": 2.5}
+80|2023-08-18|FALSE|-18|-8971|679027874|6535956962935330265|3960889045799757165|-13219.759766|1187161924.505394|-526615878.0|-947410627.0|2023-03-11
07:40:00|2022-11-29|2023-01-14
07:24:07|\N|D|3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve|{"car":
"BMW", "model": "X5", "year": 2020, "color": "black"}
+85|2023-08-11|TRUE|-7|24304|-2043877415|-2024144417867729183|\N|5363.024414|-578615669.042831|-378574346.0|-810302932.0|2023-07-15
01:07:41|2023-08-13|2023-01-20 11:57:48|i||WQ9dh9ajPu0y|{"country": "France",
"capital": "Paris", "population": 67081000}
+31|2023-08-27|FALSE|17|-18849|1728109133|3266501886640700374|527195452623418935|-24062.328125|-1514348021.262435|-322205854.0|-278237157.0|2022-10-07
03:24:23|2022-09-25|\N|0|8|yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4|{"team":
"Manchester United", "players": ["Ronaldo", "Rooney", "Giggs"], "coach": "Ole
Gunnar Solskjaer"}
+20|2023-08-17|FALSE|-5|18158|784479801|1485484354598941738|-6632681928222776815|9708.430664|-330432620.706069|-816424174.0|571112646.0|2022-09-15
21:40:55|2023-02-23|2023-08-13
21:31:54|O|X|2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn|{"name":
"Sarah", "age": 30, "city": "London", "isMarried": false}
+90|2023-08-27|TRUE|22|16456|-1476824962|-3279894870153540825|8990195191470116763|26651.906250|206860148.942546|-580959198.0|-210329147.0|2022-10-07
03:11:03|2023-03-18|2023-04-15
00:38:33|T|L|QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD|{"company":
"Apple", "products": [{"name": "iPhone", "price": 1000}, {"name": "MacBook",
"price": 1500}]}
+8|2023-08-14|TRUE|109|-31573|-1362465190|3990845741226497177|2732763251146840270|-25698.552734|1312831962.567818|771983879.0|173937916.0|2023-03-07
14:13:19|2022-10-18|2023-07-16
05:03:13|D||PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme|{"animal":
"lion", "weight": 200, "habitat": ["savannah", "grassland"]}
+65|2023-08-09|FALSE|94|31514|814994517|-297697460695940343|734910652450318597|-13061.891602|62750847.041706|-9808654.0|\N|2023-08-14
22:01:27|2023-05-19|2022-11-13
13:44:28|V||aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf|{"language":
"Python", "version": 3.9, "frameworks": ["Django", "Flask"]}
+62|2023-08-21|FALSE|81|20302|-200761532|6365479976421007608|\N|-29916.533203|1709141750.828478|549873536.0|-119205359.0|2023-05-04
01:14:51|2022-09-17|2022-12-04
19:30:09|d|v|BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD|{"username":
"user123", "password": "pass123", "email": "[email protected]"}
+50|2023-08-06|TRUE|109|-6330|1479023892|-8630800697573159428|-1645095773540208759|17880.960938|-1453844792.013949|-158871820.0|-862940384.0|2022-09-22
02:03:21|2023-05-14|2023-03-25
02:18:34|m||JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl|{"city":
"Tokyo", "temperature": 20.5, "humidity": 75}
+58|2023-08-22|\N|0|-18231|1832867360|6997858407575297145|2480714305422728023|-5450.488770|1475901032.138386|-893480655.0|-607891858.0|2023-02-02
05:13:24|2022-09-18|2023-04-23
10:51:15|k||LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja|{"restaurant":
"Pizza Hut", "menu": ["pizza", "pasta", "salad"]}
+60|2023-08-27|FALSE|-52|-2338|-757056972|1047567408607120856|6541476642780646552|6614.089355|-1204448798.517855|236657733.0|731515433.0|2022-12-29
14:47:30|2022-09-24|2023-08-01
12:41:59|O|F|RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU|{"game":
"Chess", "players": 2, "time": "1 hour"}
+68|2023-08-23|TRUE|-73|20117|1737338128|795638676048937749|-5551546237562433901|-30627.039062|68589475.684545|585022347.0|513722420.0|2022-12-28
20:26:51|2022-10-04|2023-07-30
00:20:06|y||keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM|{"country":
"Brazil", "continent": "South America", "population": 211049527}
+50|2023-08-24|TRUE|15|14403|\N|-6418906115745394180|9205303779366462513|-4331.548828|-615112179.557648|367305015.0|-551652958.0|2022-12-29
02:27:20|2023-06-01|2023-08-12
04:50:04|a||eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM|{"band":
"The Beatles", "members": ["John Lennon", "Paul McCartney", "George Harrison",
"Ringo Starr"]}
+81|2023-08-23|FALSE|106|11492|-667795397|4480250461471356146|-5346660566234294101|9082.750000|385167225.902608|-717553011.0|649146853.0|2023-03-20
03:33:16|2022-11-24|2023-02-16
18:29:41|G|9|Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag|{"flower": "rose",
"color": "red", "fragrance": true}
+41|2023-08-27|TRUE|-104|22750|\N|8527773271030840740|5554497317268279215|-5296.828125|-1715646888.013040|-306075962.0|897769189.0|2022-12-02
17:56:44|2022-10-12|2023-02-19
07:02:54|V|\N|E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V|{"food":
"Sushi", "price": 10, "restaurant": "Sushi King"}
+21|2023-08-18|FALSE|63|-27847|-35409596|8638201997392767650|4919963231735304178|-23382.541016|-1803403621.426313|-22009767.0|661750756.0|2023-03-31
10:56:14|2023-01-20|2023-02-18
13:37:52|N|T|PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi|{"city":
"Sydney", "population": 5312000, "area": 2058.7}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
new file mode 100644
index 00000000000..bb1afb6dd34
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+import groovy.json.JsonSlurper
+
+suite("test_routine_load_metrics","p0") {
+ def kafkaCsvTpoics = [
+ "test_metrics",
+ ]
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ def jobName = "test_metrics"
+ def tableName = "test_metrics"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL,
+ kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
+ kd02 TINYINT NOT NULL DEFAULT "1",
+ kd03 SMALLINT NOT NULL DEFAULT "2",
+ kd04 INT NOT NULL DEFAULT "3",
+ kd05 BIGINT NOT NULL DEFAULT "4",
+ kd06 LARGEINT NOT NULL DEFAULT "5",
+ kd07 FLOAT NOT NULL DEFAULT "6.0",
+ kd08 DOUBLE NOT NULL DEFAULT "7.0",
+ kd09 DECIMAL NOT NULL DEFAULT "888888888",
+ kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
+ kd11 DATE NOT NULL DEFAULT "2023-08-24",
+ kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
+ kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd18 JSON NULL,
+
+ INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+ INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+ INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+ INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+ INDEX idx_inverted_k117 (`k17`) USING INVERTED
PROPERTIES("parser" = "english"),
+ INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+ INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+
+ )
+ DUPLICATE KEY(k00)
+ PARTITION BY RANGE(k01)
+ (
+ PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+ PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+ PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+ )
+ DISTRIBUTED BY HASH(k00) BUCKETS 32
+ PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+ );
+ """
+ sql "sync"
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ // load data
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+
+ // check metrics
+ count = 0
+ def metricCount = 0
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+ def beHttpAddress
=backendId_to_backendIP.entrySet()[0].getValue()+":"+backendId_to_backendHttpPort.entrySet()[0].getValue()
+ log.info("beHttpAddress: ${beHttpAddress}")
+ while (true) {
+ metricCount = 0
+ httpTest {
+ endpoint context.config.feHttpAddress
+ uri "/metrics?type=json"
+ op "get"
+ check { code, body ->
+ logger.debug("code:${code} body:${body}");
+ if
(body.contains("doris_fe_routine_load_task_execute_time")){
+ log.info("contain
doris_fe_routine_load_task_execute_time")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_routine_load_task_execute_count")){
+ log.info("contain
doris_fe_routine_load_task_execute_count")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_routine_load_get_meta_latency")){
+ log.info("contain
doris_fe_routine_load_get_meta_latency")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_routine_load_get_meta_count")){
+ log.info("contain
doris_fe_routine_load_get_meta_count")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_routine_load_get_meta_fail_count")){
+ log.info("contain
doris_fe_routine_load_get_meta_fail_count")
+ metricCount++
+ }
+ if
(body.contains("doris_fe_routine_load_receive_bytes")){
+ log.info("contain
doris_fe_routine_load_receive_bytes")
+ metricCount++
+ }
+ if (body.contains("doris_fe_routine_load_rows")){
+ log.info("contain doris_fe_routine_load_rows")
+ metricCount++
+ }
+ }
+ }
+
+ httpTest {
+ endpoint beHttpAddress
+ uri "/metrics?type=json"
+ op "get"
+ check { code, body ->
+ logger.debug("code:${code} body:${body}");
+ if (body.contains("routine_load_get_msg_latency")){
+ log.info("contain routine_load_get_msg_latency")
+ metricCount++
+ }
+ if (body.contains("routine_load_get_msg_count")){
+ log.info("contain routine_load_get_msg_count")
+ metricCount++
+ }
+ if (body.contains("routine_load_consume_bytes")){
+ log.info("contain routine_load_consume_bytes")
+ metricCount++
+ }
+ if (body.contains("routine_load_consume_rows")){
+ log.info("contain routine_load_consume_rows")
+ metricCount++
+ }
+ }
+ }
+
+ log.info("metricCount: ${metricCount}".toString())
+ if (metricCount == 11){
+ break;
+ }
+
+ count++
+ sleep(1000)
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]