This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b1d63402d6c Add QUEUE_START_TIME/QUEUE_END_TIME/QUERY_STATUS column
for active_queries (#32259)
b1d63402d6c is described below
commit b1d63402d6c179dfaea25c244c48e3f761099fa1
Author: wangbo <[email protected]>
AuthorDate: Fri Mar 15 22:06:47 2024 +0800
Add QUEUE_START_TIME/QUEUE_END_TIME/QUERY_STATUS column for active_queries
(#32259)
---
.../schema_active_queries_scanner.cpp | 8 +-
.../java/org/apache/doris/catalog/SchemaTable.java | 5 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 4 +
.../java/org/apache/doris/qe/QeProcessorImpl.java | 29 ++++++-
.../org/apache/doris/qe/QueryStatisticsItem.java | 6 +-
.../doris/resource/workloadgroup/QueryQueue.java | 6 +-
.../doris/resource/workloadgroup/QueueToken.java | 33 +++++++-
.../doris/tablefunction/MetadataGenerator.java | 90 +++++++++-------------
.../schema_table/test_active_queries.groovy | 6 +-
9 files changed, 123 insertions(+), 64 deletions(-)
diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
index f16326dc8f5..36cb145e3f5 100644
--- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
@@ -29,11 +29,14 @@ namespace doris {
std::vector<SchemaScanner::ColumnDesc>
SchemaActiveQueriesScanner::_s_tbls_columns = {
// name, type, size
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true},
- {"START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"QUERY_START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
{"QUERY_TIME_MS", TYPE_BIGINT, sizeof(int64_t), true},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"DATABASE", TYPE_VARCHAR, sizeof(StringRef), true},
{"FRONTEND_INSTANCE", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"QUEUE_START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"QUEUE_END_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"QUERY_STATUS", TYPE_VARCHAR, sizeof(StringRef), true},
{"SQL", TYPE_STRING, sizeof(StringRef), true}};
SchemaActiveQueriesScanner::SchemaActiveQueriesScanner()
@@ -127,6 +130,9 @@ Status
SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
insert_string_value(4, row.column_value[4].stringVal,
_active_query_block.get());
insert_string_value(5, row.column_value[5].stringVal,
_active_query_block.get());
insert_string_value(6, row.column_value[6].stringVal,
_active_query_block.get());
+ insert_string_value(7, row.column_value[7].stringVal,
_active_query_block.get());
+ insert_string_value(8, row.column_value[8].stringVal,
_active_query_block.get());
+ insert_string_value(9, row.column_value[9].stringVal,
_active_query_block.get());
}
return Status::OK();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 736df74f1e6..b89b7e1ea6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -460,11 +460,14 @@ public class SchemaTable extends Table {
.build()))
.put("active_queries", new
SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA,
builder().column("QUERY_ID", ScalarType.createVarchar(256))
- .column("START_TIME",
ScalarType.createVarchar(256))
+ .column("QUERY_START_TIME",
ScalarType.createVarchar(256))
.column("QUERY_TIME_MS",
ScalarType.createType(PrimitiveType.BIGINT))
.column("WORKLOAD_GROUP_ID",
ScalarType.createType(PrimitiveType.BIGINT))
.column("DATABASE", ScalarType.createVarchar(256))
.column("FRONTEND_INSTANCE",
ScalarType.createVarchar(256))
+ .column("QUEUE_START_TIME",
ScalarType.createVarchar(256))
+ .column("QUEUE_END_TIME",
ScalarType.createVarchar(256))
+ .column("QUERY_STATUS",
ScalarType.createVarchar(256))
.column("SQL", ScalarType.createStringType())
.build()))
.put("workload_groups", new
SchemaTable(SystemIdGenerator.getNextId(), "workload_groups", TableType.SCHEMA,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2671228fc3c..7a213e47862 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3906,6 +3906,10 @@ public class Coordinator implements CoordInterface {
}
}
+ public QueueToken getQueueToken() {
+ return queueToken;
+ }
+
// fragment instance exec param, it is used to assemble
// the per-instance TPlanFragmentExecParams, as a member of
// FragmentExecParams
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 55536639bfd..03144fc797c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
@@ -265,7 +266,6 @@ public final class QeProcessorImpl implements QeProcessor {
private final ConnectContext connectContext;
private final Coordinator coord;
private final String sql;
- private final long startExecTime;
// from Export, Pull load, Insert
public QueryInfo(Coordinator coord) {
@@ -277,7 +277,6 @@ public final class QeProcessorImpl implements QeProcessor {
this.connectContext = connectContext;
this.coord = coord;
this.sql = sql;
- this.startExecTime = System.currentTimeMillis();
}
public ConnectContext getConnectContext() {
@@ -293,7 +292,31 @@ public final class QeProcessorImpl implements QeProcessor {
}
public long getStartExecTime() {
- return startExecTime;
+ if (coord.getQueueToken() != null) {
+ return coord.getQueueToken().getQueueEndTime();
+ }
+ return -1;
+ }
+
+ public long getQueueStartTime() {
+ if (coord.getQueueToken() != null) {
+ return coord.getQueueToken().getQueueStartTime();
+ }
+ return -1;
+ }
+
+ public long getQueueEndTime() {
+ if (coord.getQueueToken() != null) {
+ return coord.getQueueToken().getQueueEndTime();
+ }
+ return -1;
+ }
+
+ public TokenState getQueueStatus() {
+ if (coord.getQueueToken() != null) {
+ return coord.getQueueToken().getTokenState();
+ }
+ return null;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
index 79c3e083113..76b528464d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
@@ -74,7 +74,11 @@ public final class QueryStatisticsItem {
public String getQueryExecTime() {
final long currentTime = System.currentTimeMillis();
- return String.valueOf(currentTime - queryStartTime);
+ if (queryStartTime <= 0) {
+ return String.valueOf(-1);
+ } else {
+ return String.valueOf(currentTime - queryStartTime);
+ }
}
public String getQueryId() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 7ba6353e746..5953edbf66e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -100,7 +100,6 @@ public class QueryQueue {
}
public QueueToken getToken() throws UserException {
-
queueLock.lock();
try {
if (LOG.isDebugEnabled()) {
@@ -108,13 +107,16 @@ public class QueryQueue {
}
if (currentRunningQueryNum < maxConcurrency) {
currentRunningQueryNum++;
- return new QueueToken(TokenState.READY_TO_RUN, queueTimeout,
"offer success");
+ QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN,
queueTimeout, "offer success");
+ retToken.setQueueTimeWhenOfferSuccess();
+ return retToken;
}
if (priorityTokenQueue.size() >= maxQueueSize) {
throw new UserException("query waiting queue is full, queue
length=" + maxQueueSize);
}
QueueToken newQueryToken = new
QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
"query wait timeout " + queueTimeout + " ms");
+ newQueryToken.setQueueTimeWhenQueueSuccess();
this.priorityTokenQueue.offer(newQueryToken);
return newQueryToken;
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index 189ba77e8de..6bf44c78828 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -36,7 +36,7 @@ public class QueueToken implements Comparable<QueueToken> {
return Long.compare(this.tokenId, other.getTokenId());
}
- enum TokenState {
+ public enum TokenState {
ENQUEUE_SUCCESS,
READY_TO_RUN
}
@@ -56,6 +56,9 @@ public class QueueToken implements Comparable<QueueToken> {
private final ReentrantLock tokenLock = new ReentrantLock();
private final Condition tokenCond = tokenLock.newCondition();
+ private long queueStartTime = -1;
+ private long queueEndTime = -1;
+
public QueueToken(TokenState tokenState, long queueWaitTimeout,
String offerResultDetail) {
this.tokenId = tokenIdGenerator.addAndGet(1);
@@ -94,6 +97,7 @@ public class QueueToken implements Comparable<QueueToken> {
return false;
} finally {
this.tokenLock.unlock();
+ this.setQueueTimeWhenQueueEnd();
}
}
@@ -126,6 +130,33 @@ public class QueueToken implements Comparable<QueueToken> {
return this.tokenState == TokenState.READY_TO_RUN;
}
+ public void setQueueTimeWhenOfferSuccess() {
+ long currentTime = System.currentTimeMillis();
+ this.queueStartTime = currentTime;
+ this.queueEndTime = currentTime;
+ }
+
+ public void setQueueTimeWhenQueueSuccess() {
+ long currentTime = System.currentTimeMillis();
+ this.queueStartTime = currentTime;
+ }
+
+ public void setQueueTimeWhenQueueEnd() {
+ this.queueEndTime = System.currentTimeMillis();
+ }
+
+ public long getQueueStartTime() {
+ return queueStartTime;
+ }
+
+ public long getQueueEndTime() {
+ return queueEndTime;
+ }
+
+ public TokenState getTokenState() {
+ return tokenState;
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 8f4dbf95616..adf3a9b3ed2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -42,6 +42,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
+import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
@@ -57,7 +58,6 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
-import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TSchemaTableRequestParams;
import org.apache.doris.thrift.TStatus;
@@ -91,11 +91,14 @@ public class MetadataGenerator {
private static final ImmutableList<Column> ACTIVE_QUERIES_SCHEMA =
ImmutableList.of(
new Column("QUERY_ID", ScalarType.createStringType()),
- new Column("START_TIME", ScalarType.createStringType()),
+ new Column("QUERY_START_TIME", ScalarType.createStringType()),
new Column("QUERY_TIME_MS", PrimitiveType.BIGINT),
new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT),
new Column("DATABASE", ScalarType.createStringType()),
new Column("FRONTEND_INSTANCE", ScalarType.createStringType()),
+ new Column("QUEUE_START_TIME", ScalarType.createStringType()),
+ new Column("QUEUE_END_TIME", ScalarType.createStringType()),
+ new Column("QUERY_STATUS", ScalarType.createStringType()),
new Column("SQL", ScalarType.createStringType()));
private static final ImmutableMap<String, Integer>
ACTIVE_QUERIES_COLUMN_TO_INDEX;
@@ -490,53 +493,6 @@ public class MetadataGenerator {
return result;
}
- private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String
queryId, Backend be,
- String selfNode, QueryInfo queryInfo, TQueryStatistics qs) {
- TRow trow = new TRow();
- if (be != null) {
- trow.addToColumnValue(new TCell().setStringVal(be.getHost()));
- trow.addToColumnValue(new TCell().setLongVal(be.getBePort()));
- } else {
- trow.addToColumnValue(new TCell().setStringVal("invalid host"));
- trow.addToColumnValue(new TCell().setLongVal(-1));
- }
- trow.addToColumnValue(new TCell().setStringVal(queryId));
-
- String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
- trow.addToColumnValue(new TCell().setStringVal(strDate));
- trow.addToColumnValue(new
TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
-
- if (qs != null) {
- trow.addToColumnValue(new
TCell().setLongVal(qs.workload_group_id));
- trow.addToColumnValue(new TCell().setLongVal(qs.cpu_ms));
- trow.addToColumnValue(new TCell().setLongVal(qs.scan_rows));
- trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes));
- trow.addToColumnValue(new
TCell().setLongVal(qs.max_peak_memory_bytes));
- trow.addToColumnValue(new
TCell().setLongVal(qs.current_used_memory_bytes));
- trow.addToColumnValue(new
TCell().setLongVal(qs.shuffle_send_bytes));
- trow.addToColumnValue(new
TCell().setLongVal(qs.shuffle_send_rows));
- } else {
- trow.addToColumnValue(new TCell().setLongVal(0L));
- trow.addToColumnValue(new TCell().setLongVal(0L));
- trow.addToColumnValue(new TCell().setLongVal(0L));
- trow.addToColumnValue(new TCell().setLongVal(0L));
- trow.addToColumnValue(new TCell().setLongVal(0L));
- trow.addToColumnValue(new TCell().setLongVal(0L));
- trow.addToColumnValue(new TCell().setLongVal(0L));
- trow.addToColumnValue(new TCell().setLongVal(0L));
- }
-
- if (queryInfo.getConnectContext() != null) {
- trow.addToColumnValue(new
TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
- } else {
- trow.addToColumnValue(new TCell().setStringVal(""));
- }
- trow.addToColumnValue(new TCell().setStringVal(selfNode));
- trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
-
- return trow;
- }
-
private static TFetchSchemaTableDataResult
queriesMetadataResult(TSchemaTableRequestParams tSchemaTableParams,
TFetchSchemaTableDataRequest parentRequest) {
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
@@ -557,9 +513,15 @@ public class MetadataGenerator {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(queryId));
- String strDate = sdf.format(new
Date(queryInfo.getStartExecTime()));
- trow.addToColumnValue(new TCell().setStringVal(strDate));
- trow.addToColumnValue(new
TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
+ long queryStartTime = queryInfo.getStartExecTime();
+ if (queryStartTime > 0) {
+ trow.addToColumnValue(new TCell().setStringVal(sdf.format(new
Date(queryStartTime))));
+ trow.addToColumnValue(
+ new TCell().setLongVal(System.currentTimeMillis() -
queryInfo.getStartExecTime()));
+ } else {
+ trow.addToColumnValue(new TCell());
+ trow.addToColumnValue(new TCell().setLongVal(-1));
+ }
List<TPipelineWorkloadGroup> tgroupList =
queryInfo.getCoord().gettWorkloadGroups();
if (tgroupList != null && tgroupList.size() == 1) {
@@ -574,6 +536,30 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(selfNode));
+
+ long queueStartTime = queryInfo.getQueueStartTime();
+ if (queueStartTime > 0) {
+ trow.addToColumnValue(new TCell().setStringVal(sdf.format(new
Date(queueStartTime))));
+ } else {
+ trow.addToColumnValue(new TCell());
+ }
+
+ long queueEndTime = queryInfo.getQueueEndTime();
+ if (queueEndTime > 0) {
+ trow.addToColumnValue(new TCell().setStringVal(sdf.format(new
Date(queueEndTime))));
+ } else {
+ trow.addToColumnValue(new TCell());
+ }
+
+ TokenState tokenState = queryInfo.getQueueStatus();
+ if (tokenState == null) {
+ trow.addToColumnValue(new TCell());
+ } else if (tokenState == TokenState.READY_TO_RUN) {
+ trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
+ }
+
trow.addToColumnValue(new
TCell().setStringVal(queryInfo.getSql()));
dataBatch.add(trow);
}
diff --git
a/regression-test/suites/query_p0/schema_table/test_active_queries.groovy
b/regression-test/suites/query_p0/schema_table/test_active_queries.groovy
index eecba3d063b..64c344deec7 100644
--- a/regression-test/suites/query_p0/schema_table/test_active_queries.groovy
+++ b/regression-test/suites/query_p0/schema_table/test_active_queries.groovy
@@ -22,19 +22,19 @@ suite("test_active_queries") {
sql "set experimental_enable_pipeline_engine=false"
sql "set experimental_enable_pipeline_x_engine=false"
sql "select * from information_schema.active_queries"
- sql "select
QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from
information_schema.active_queries"
+ sql "select
QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from
information_schema.active_queries"
// pipeline
sql "set experimental_enable_pipeline_engine=true"
sql "set experimental_enable_pipeline_x_engine=false"
sql "select * from information_schema.active_queries"
- sql "select
QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from
information_schema.active_queries"
+ sql "select
QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from
information_schema.active_queries"
// pipelinex
sql "set experimental_enable_pipeline_engine=true"
sql "set experimental_enable_pipeline_x_engine=true"
sql "select * from information_schema.active_queries"
- sql "select
QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from
information_schema.active_queries"
+ sql "select
QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from
information_schema.active_queries"
Thread.sleep(1000)
}
})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]