This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b5a322297b6 Refactor active queries (#31742) (#32312)
b5a322297b6 is described below
commit b5a322297b6cd8539e2e0f38fa3c2f39596183cc
Author: wangbo <[email protected]>
AuthorDate: Fri Mar 15 19:39:54 2024 +0800
Refactor active queries (#31742) (#32312)
---
.../table-functions/active_queries.md | 45 ++++++++----------
.../table-functions/active_queries.md | 45 ++++++++----------
.../trees/plans/commands/InsertExecutor.java | 4 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 4 ++
.../java/org/apache/doris/qe/StmtExecutor.java | 5 +-
.../ActiveQueriesTableValuedFunction.java | 9 ----
.../doris/tablefunction/MetadataGenerator.java | 54 +++++++++++-----------
7 files changed, 74 insertions(+), 92 deletions(-)
diff --git
a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
index 35a71b5eb60..cbc0e20845d 100644
--- a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
+++ b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
@@ -45,37 +45,30 @@ This function is used in FROM clauses.
active_queries() table schema:
```
-mysql [(none)]> desc function active_queries();
-+------------------------+--------+------+-------+---------+-------+
-| Field | Type | Null | Key | Default | Extra |
-+------------------------+--------+------+-------+---------+-------+
-| BeHost | TEXT | No | false | NULL | NONE |
-| BePort | BIGINT | No | false | NULL | NONE |
-| QueryId | TEXT | No | false | NULL | NONE |
-| StartTime | TEXT | No | false | NULL | NONE |
-| QueryTimeMs | BIGINT | No | false | NULL | NONE |
-| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
-| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE |
-| ScanRows | BIGINT | No | false | NULL | NONE |
-| ScanBytes | BIGINT | No | false | NULL | NONE |
-| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE |
-| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE |
-| Database | TEXT | No | false | NULL | NONE |
-| FrontendInstance | TEXT | No | false | NULL | NONE |
-| Sql | TEXT | No | false | NULL | NONE |
-+------------------------+--------+------+-------+---------+-------+
-14 rows in set (0.00 sec)
+mysql [(none)]>desc function active_queries();
++------------------+--------+------+-------+---------+-------+
+| Field | Type | Null | Key | Default | Extra |
++------------------+--------+------+-------+---------+-------+
+| QueryId | TEXT | No | false | NULL | NONE |
+| StartTime | TEXT | No | false | NULL | NONE |
+| QueryTimeMs | BIGINT | No | false | NULL | NONE |
+| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
+| Database | TEXT | No | false | NULL | NONE |
+| FrontendInstance | TEXT | No | false | NULL | NONE |
+| Sql | TEXT | No | false | NULL | NONE |
++------------------+--------+------+-------+---------+-------+
+7 rows in set (0.00 sec)
```
### example
```
mysql [(none)]>select * from active_queries();
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-| BeHost | BePort | QueryId | StartTime
| QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes |
BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql
|
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15
| 7260 | 10002 | 8392 | 16082249 | 4941889536 |
360470040 | 360420915 | hits | localhost | SELECT xxxx |
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-1 row in set (0.01 sec)
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+| QueryId | StartTime | QueryTimeMs |
WorkloadGroupId | Database | FrontendInstance | Sql |
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 |
10002 | | localhost | select * from active_queries() |
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+1 row in set (0.03 sec)
```
### keywords
diff --git
a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
index bdae08285f2..feda3c128ca 100644
--- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
+++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
@@ -45,37 +45,30 @@ active_queries
active_queries()表结构:
```
-mysql [(none)]> desc function active_queries();
-+------------------------+--------+------+-------+---------+-------+
-| Field | Type | Null | Key | Default | Extra |
-+------------------------+--------+------+-------+---------+-------+
-| BeHost | TEXT | No | false | NULL | NONE |
-| BePort | BIGINT | No | false | NULL | NONE |
-| QueryId | TEXT | No | false | NULL | NONE |
-| StartTime | TEXT | No | false | NULL | NONE |
-| QueryTimeMs | BIGINT | No | false | NULL | NONE |
-| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
-| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE |
-| ScanRows | BIGINT | No | false | NULL | NONE |
-| ScanBytes | BIGINT | No | false | NULL | NONE |
-| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE |
-| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE |
-| Database | TEXT | No | false | NULL | NONE |
-| FrontendInstance | TEXT | No | false | NULL | NONE |
-| Sql | TEXT | No | false | NULL | NONE |
-+------------------------+--------+------+-------+---------+-------+
-14 rows in set (0.00 sec)
+mysql [(none)]>desc function active_queries();
++------------------+--------+------+-------+---------+-------+
+| Field | Type | Null | Key | Default | Extra |
++------------------+--------+------+-------+---------+-------+
+| QueryId | TEXT | No | false | NULL | NONE |
+| StartTime | TEXT | No | false | NULL | NONE |
+| QueryTimeMs | BIGINT | No | false | NULL | NONE |
+| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
+| Database | TEXT | No | false | NULL | NONE |
+| FrontendInstance | TEXT | No | false | NULL | NONE |
+| Sql | TEXT | No | false | NULL | NONE |
++------------------+--------+------+-------+---------+-------+
+7 rows in set (0.00 sec)
```
### example
```
mysql [(none)]>select * from active_queries();
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-| BeHost | BePort | QueryId | StartTime
| QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes |
BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql
|
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15
| 7260 | 10002 | 8392 | 16082249 | 4941889536 |
360470040 | 360420915 | hits | localhost | SELECT xxxx |
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-1 row in set (0.01 sec)
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+| QueryId | StartTime | QueryTimeMs |
WorkloadGroupId | Database | FrontendInstance | Sql |
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 |
10002 | | localhost | select * from active_queries() |
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+1 row in set (0.03 sec)
```
### keywords
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
index 2c5ee559f3d..da4a1c6ac68 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
@@ -69,6 +69,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
@@ -222,7 +223,8 @@ public class InsertExecutor {
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
- QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator);
+ QueryInfo queryInfo = new QueryInfo(ConnectContext.get(),
executor.getOriginStmtInString(), coordinator);
+ QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
if (LOG.isDebugEnabled()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8bf1b3813ba..6e016d94e2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -267,6 +267,10 @@ public class Coordinator implements CoordInterface {
this.tWorkloadGroups = tWorkloadGroups;
}
+ public List<TPipelineWorkloadGroup> gettWorkloadGroups() {
+ return tWorkloadGroups;
+ }
+
private List<TPipelineWorkloadGroup> tWorkloadGroups =
Lists.newArrayList();
private final ExecutionProfile executionProfile;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 808ec0de09d..063cbf0ad71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -147,6 +147,7 @@ import
org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
+import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
@@ -2053,8 +2054,8 @@ public class StmtExecutor {
coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict());
coord.setQueryType(TQueryType.LOAD);
profile.setExecutionProfile(coord.getExecutionProfile());
-
- QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
coord);
+ QueryInfo queryInfo = new QueryInfo(ConnectContext.get(),
this.getOriginStmtInString(), coord);
+ QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
queryInfo);
Table table = insertStmt.getTargetTable();
if (table instanceof OlapTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
index 41dd5484dd5..ebc0ffa1121 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
@@ -35,19 +35,10 @@ public class ActiveQueriesTableValuedFunction extends
MetadataTableValuedFunctio
public static final String NAME = "active_queries";
private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
- new Column("BeHost", ScalarType.createStringType()),
- new Column("BePort", PrimitiveType.BIGINT),
new Column("QueryId", ScalarType.createStringType()),
new Column("StartTime", ScalarType.createStringType()),
new Column("QueryTimeMs", PrimitiveType.BIGINT),
new Column("WorkloadGroupId", PrimitiveType.BIGINT),
- new Column("QueryCpuTimeMs", PrimitiveType.BIGINT),
- new Column("ScanRows", PrimitiveType.BIGINT),
- new Column("ScanBytes", PrimitiveType.BIGINT),
- new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
- new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
- new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
- new Column("ShuffleSendRows", PrimitiveType.BIGINT),
new Column("Database", ScalarType.createStringType()),
new Column("FrontendInstance", ScalarType.createStringType()),
new Column("Sql", ScalarType.createStringType()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 87aabe5bc09..24cb4a365a9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -56,6 +56,7 @@ import
org.apache.doris.thrift.TMaterializedViewsMetadataParams;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueriesMetadataParams;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TRow;
@@ -83,7 +84,6 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
public class MetadataGenerator {
@@ -517,7 +517,7 @@ public class MetadataGenerator {
}
private static TFetchSchemaTableDataResult
queriesMetadataResult(TMetadataTableRequestParams params,
-
TFetchSchemaTableDataRequest parentRequest) {
+ TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
return errorResult("queries metadata param is not set.");
}
@@ -531,37 +531,35 @@ public class MetadataGenerator {
}
selfNode = NetUtils.getHostnameByIp(selfNode);
- // get query
- Map<Long, Map<String, TQueryStatistics>> beQsMap =
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr()
- .getBeQueryStatsMap();
- Set<Long> beIdSet = beQsMap.keySet();
-
List<TRow> dataBatch = Lists.newArrayList();
Map<String, QueryInfo> queryInfoMap =
QeProcessorImpl.INSTANCE.getQueryInfoMap();
-
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- for (Long beId : beIdSet) {
- Map<String, TQueryStatistics> qsMap = beQsMap.get(beId);
- if (qsMap == null) {
- continue;
+ for (Map.Entry<String, QueryInfo> entry : queryInfoMap.entrySet()) {
+ String queryId = entry.getKey();
+ QueryInfo queryInfo = entry.getValue();
+
+ TRow trow = new TRow();
+ trow.addToColumnValue(new TCell().setStringVal(queryId));
+
+ String strDate = sdf.format(new
Date(queryInfo.getStartExecTime()));
+ trow.addToColumnValue(new TCell().setStringVal(strDate));
+ trow.addToColumnValue(new
TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
+
+ List<TPipelineWorkloadGroup> tgroupList =
queryInfo.getCoord().gettWorkloadGroups();
+ if (tgroupList != null && tgroupList.size() == 1) {
+ trow.addToColumnValue(new
TCell().setLongVal(tgroupList.get(0).id));
+ } else {
+ trow.addToColumnValue(new TCell().setLongVal(-1));
}
- Set<String> queryIdSet = qsMap.keySet();
- for (String queryId : queryIdSet) {
- QueryInfo queryInfo = queryInfoMap.get(queryId);
- if (queryInfo == null) {
- continue;
- }
- //todo(wb) add connect context for insert select
- if (queryInfo.getConnectContext() != null &&
!Env.getCurrentEnv().getAccessManager()
- .checkDbPriv(queryInfo.getConnectContext(),
queryInfo.getConnectContext().getDatabase(),
- PrivPredicate.SELECT)) {
- continue;
- }
- TQueryStatistics qs = qsMap.get(queryId);
- Backend be =
Env.getCurrentEnv().getClusterInfo().getBackend(beId);
- TRow tRow = makeQueryStatisticsTRow(sdf, queryId, be,
selfNode, queryInfo, qs);
- dataBatch.add(tRow);
+
+ if (queryInfo.getConnectContext() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal(""));
}
+ trow.addToColumnValue(new TCell().setStringVal(selfNode));
+ trow.addToColumnValue(new
TCell().setStringVal(queryInfo.getSql()));
+ dataBatch.add(trow);
}
/* Get the query results from other FE also */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]