This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 2dcdf07dd41 [Feature](profile)Support active_queries TVF (#29999)
2dcdf07dd41 is described below
commit 2dcdf07dd413d78301a0983eb06804386896f343
Author: wangbo <[email protected]>
AuthorDate: Tue Jan 16 21:24:30 2024 +0800
[Feature](profile)Support active_queries TVF (#29999)
---
be/src/runtime/fragment_mgr.cpp | 3 +
be/src/runtime/memory/mem_tracker.h | 1 +
be/src/runtime/query_statistics.cpp | 18 +++-
be/src/runtime/query_statistics.h | 23 ++++--
be/src/runtime/runtime_query_statistics_mgr.cpp | 9 ++
be/src/runtime/runtime_query_statistics_mgr.h | 4 +
.../table-functions/active_queries.md | 83 +++++++++++++++++++
.../sql-functions/table-functions/queries.md | 79 ------------------
docs/sidebars.json | 2 +-
.../table-functions/active_queries.md | 83 +++++++++++++++++++
.../sql-functions/table-functions/queries.md | 79 ------------------
.../doris/catalog/BuiltinTableValuedFunctions.java | 4 +-
.../doris/httpv2/rest/QueryDetailAction.java | 63 --------------
.../table/{Queries.java => ActiveQueries.java} | 10 +--
.../visitor/TableValuedFunctionVisitor.java | 4 +-
.../java/org/apache/doris/qe/AuditLogHelper.java | 12 ---
.../main/java/org/apache/doris/qe/QeProcessor.java | 3 +
.../java/org/apache/doris/qe/QeProcessorImpl.java | 13 +++
.../java/org/apache/doris/qe/StmtExecutor.java | 9 --
.../WorkloadRuntimeStatusMgr.java | 4 +
....java => ActiveQueriesTableValuedFunction.java} | 29 ++++---
.../doris/tablefunction/MetadataGenerator.java | 96 ++++++++++++++++++----
.../tablefunction/MetadataTableValuedFunction.java | 2 +-
.../doris/tablefunction/TableValuedFunctionIf.java | 4 +-
gensrc/thrift/FrontendService.thrift | 2 +
.../tvf/queries/test_queries_tvf.groovy | 4 +-
.../suites/nereids_function_p0/tvf/tvf.groovy | 2 +-
27 files changed, 348 insertions(+), 297 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f0282e9d7c1..7d3dba81020 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -65,6 +65,7 @@
#include "runtime/primitive_type.h"
#include "runtime/query_context.h"
#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
@@ -707,6 +708,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
query_ctx->use_task_group_for_cpu_limit.store(true);
}
LOG(INFO) << ss.str();
+
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+ print_id(query_id), tg_id);
} else {
VLOG_DEBUG << "Query/load id: " <<
print_id(query_ctx->query_id())
<< " no task group found, does not use task
group.";
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index 9ce12f1532f..de7628c1749 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -141,6 +141,7 @@ public:
_consumption->add(bytes);
if (_query_statistics) {
_query_statistics->set_max_peak_memory_bytes(_consumption->peak_value());
+
_query_statistics->set_current_used_memory_bytes(_consumption->current_value());
}
}
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index f71bad24e81..ab49b02ad43 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -44,9 +44,17 @@ void QueryStatistics::merge(const QueryStatistics& other) {
scan_bytes += other.scan_bytes;
int64_t other_cpu_time = other.cpu_nanos.load(std::memory_order_relaxed);
cpu_nanos += other_cpu_time;
- if (other.max_peak_memory_bytes > this->max_peak_memory_bytes) {
- this->max_peak_memory_bytes =
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
+
+ int64_t other_peak_mem =
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
+ if (other_peak_mem > this->max_peak_memory_bytes) {
+ this->max_peak_memory_bytes = other_peak_mem;
+ }
+
+ int64_t other_memory_used =
other.current_used_memory_bytes.load(std::memory_order_relaxed);
+ if (other_memory_used > 0) {
+ this->current_used_memory_bytes = other_memory_used;
}
+
for (auto& other_node_statistics : other._nodes_statistics_map) {
int64_t node_id = other_node_statistics.first;
auto node_statistics = add_nodes_statistics(node_id);
@@ -70,11 +78,13 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
DCHECK(statistics != nullptr);
- statistics->__set_scan_bytes(scan_bytes);
- statistics->__set_scan_rows(scan_rows);
+ statistics->__set_scan_bytes(scan_bytes.load(std::memory_order_relaxed));
+ statistics->__set_scan_rows(scan_rows.load(std::memory_order_relaxed));
statistics->__set_cpu_ms(cpu_nanos.load(std::memory_order_relaxed) /
NANOS_PER_MILLIS);
statistics->__set_returned_rows(returned_rows);
statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed));
+ statistics->__set_current_used_memory_bytes(
+ current_used_memory_bytes.load(std::memory_order_relaxed));
}
void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index 7f11daf6ec3..abaf0a251a8 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -64,14 +64,19 @@ public:
scan_bytes(0),
cpu_nanos(0),
returned_rows(0),
- max_peak_memory_bytes(0) {}
+ max_peak_memory_bytes(0),
+ current_used_memory_bytes(0) {}
virtual ~QueryStatistics();
void merge(const QueryStatistics& other);
- void add_scan_rows(int64_t scan_rows) { this->scan_rows += scan_rows; }
+ void add_scan_rows(int64_t delta_scan_rows) {
+ this->scan_rows.fetch_add(delta_scan_rows, std::memory_order_relaxed);
+ }
- void add_scan_bytes(int64_t scan_bytes) { this->scan_bytes += scan_bytes; }
+ void add_scan_bytes(int64_t delta_scan_bytes) {
+ this->scan_bytes.fetch_add(delta_scan_bytes,
std::memory_order_relaxed);
+ }
void add_cpu_nanos(int64_t delta_cpu_time) {
this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
@@ -95,6 +100,10 @@ public:
this->max_peak_memory_bytes.store(max_peak_memory_bytes,
std::memory_order_relaxed);
}
+ void set_current_used_memory_bytes(int64_t current_used_memory) {
+ this->current_used_memory_bytes.store(current_used_memory,
std::memory_order_relaxed);
+ }
+
void merge(QueryStatisticsRecvr* recvr);
void merge(QueryStatisticsRecvr* recvr, int sender_id);
@@ -121,8 +130,11 @@ public:
bool collected() const { return _collected; }
void set_collected() { _collected = true; }
- int64_t get_scan_rows() { return scan_rows.load(); }
- int64_t get_scan_bytes() { return scan_bytes.load(); }
+ int64_t get_scan_rows() { return
scan_rows.load(std::memory_order_relaxed); }
+ int64_t get_scan_bytes() { return
scan_bytes.load(std::memory_order_relaxed); }
+ int64_t get_current_used_memory_bytes() {
+ return current_used_memory_bytes.load(std::memory_order_relaxed);
+ }
private:
friend class QueryStatisticsRecvr;
@@ -139,6 +151,7 @@ private:
using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
NodeStatisticsMap _nodes_statistics_map;
bool _collected = false;
+ std::atomic<int64_t> current_used_memory_bytes;
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 6df9c0b858d..a658e527f61 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -30,6 +30,7 @@ void
QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
tmp_qs.merge(*qs_ptr);
}
tmp_qs.to_thrift(tq_s);
+ tq_s->__set_workload_group_id(_wg_id);
}
void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
@@ -171,4 +172,12 @@ std::shared_ptr<QueryStatistics>
RuntimeQueryStatiticsMgr::get_runtime_query_sta
return qs_ptr;
}
+void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id,
int64_t wg_id) {
+ // wg id just need eventual consistency, read lock is ok
+ std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
+ if (_query_statistics_ctx_map.find(query_id) !=
_query_statistics_ctx_map.end()) {
+ _query_statistics_ctx_map.at(query_id)->_wg_id = wg_id;
+ }
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index 6f1ea11a61a..98d4f554728 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -28,6 +28,7 @@ class QueryStatisticsCtx {
public:
QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) {
this->_is_query_finished = false;
+ this->_wg_id = -1;
}
~QueryStatisticsCtx() = default;
@@ -38,6 +39,7 @@ public:
bool _is_query_finished;
TNetworkAddress _fe_addr;
int64_t _query_finish_time;
+ int64_t _wg_id;
};
class RuntimeQueryStatiticsMgr {
@@ -54,6 +56,8 @@ public:
std::shared_ptr<QueryStatistics> get_runtime_query_statistics(std::string
query_id);
+ void set_workload_group_id(std::string query_id, int64_t wg_id);
+
private:
std::shared_mutex _qs_ctx_map_lock;
std::map<std::string, std::unique_ptr<QueryStatisticsCtx>>
_query_statistics_ctx_map;
diff --git
a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
new file mode 100644
index 00000000000..35a71b5eb60
--- /dev/null
+++ b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
@@ -0,0 +1,83 @@
+---
+{
+ "title": "ACTIVE_QUERIES",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## `active_queries`
+
+### Name
+
+<version since="dev">
+
+active_queries
+
+</version>
+
+### description
+
+Table-Value-Function, generate a temporary table named active_queries. This
tvf is used to view the information of running queries in doris cluster.
+
+This function is used in FROM clauses.
+
+#### syntax
+`active_queries()`
+
+active_queries() table schema:
+```
+mysql [(none)]> desc function active_queries();
++------------------------+--------+------+-------+---------+-------+
+| Field | Type | Null | Key | Default | Extra |
++------------------------+--------+------+-------+---------+-------+
+| BeHost | TEXT | No | false | NULL | NONE |
+| BePort | BIGINT | No | false | NULL | NONE |
+| QueryId | TEXT | No | false | NULL | NONE |
+| StartTime | TEXT | No | false | NULL | NONE |
+| QueryTimeMs | BIGINT | No | false | NULL | NONE |
+| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
+| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE |
+| ScanRows | BIGINT | No | false | NULL | NONE |
+| ScanBytes | BIGINT | No | false | NULL | NONE |
+| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE |
+| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE |
+| Database | TEXT | No | false | NULL | NONE |
+| FrontendInstance | TEXT | No | false | NULL | NONE |
+| Sql | TEXT | No | false | NULL | NONE |
++------------------------+--------+------+-------+---------+-------+
+14 rows in set (0.00 sec)
+```
+
+### example
+```
+mysql [(none)]>select * from active_queries();
++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
+| BeHost | BePort | QueryId | StartTime
| QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes |
BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql
|
++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
+| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15
| 7260 | 10002 | 8392 | 16082249 | 4941889536 |
360470040 | 360420915 | hits | localhost | SELECT xxxx |
++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
+1 row in set (0.01 sec)
+```
+
+### keywords
+
+ active_queries
diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/queries.md
b/docs/en/docs/sql-manual/sql-functions/table-functions/queries.md
deleted file mode 100644
index ebc2cb3ebe4..00000000000
--- a/docs/en/docs/sql-manual/sql-functions/table-functions/queries.md
+++ /dev/null
@@ -1,79 +0,0 @@
----
-{
- "title": "QUERIES",
- "language": "en"
-}
----
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-## `queries`
-
-### Name
-
-<version since="dev">
-
-queries
-
-</version>
-
-### description
-
-Table-Value-Function, generate a temporary table named queries. This tvf is
used to view the information of running queries and history queries in doris
cluster.
-
-This function is used in FROM clauses.
-
-#### syntax
-`queries()`
-
-queries() table schema:
-```
-mysql> desc function queries();
-+------------------+--------+------+-------+---------+-------+
-| Field | Type | Null | Key | Default | Extra |
-+------------------+--------+------+-------+---------+-------+
-| QueryId | TEXT | No | false | NULL | NONE |
-| StartTime | BIGINT | No | false | NULL | NONE |
-| EndTime | BIGINT | No | false | NULL | NONE |
-| EventTime | BIGINT | No | false | NULL | NONE |
-| Latency | BIGINT | No | false | NULL | NONE |
-| State | TEXT | No | false | NULL | NONE |
-| Database | TEXT | No | false | NULL | NONE |
-| Sql | TEXT | No | false | NULL | NONE |
-| FrontendInstance | TEXT | No | false | NULL | NONE |
-+------------------+--------+------+-------+---------+-------+
-9 rows in set (0.00 sec)
-```
-
-### example
-```
-mysql> select* from queries();
-+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+
-| QueryId | StartTime | EndTime |
EventTime | Latency | State | Database | Sql |
FrontendInstance |
-+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+
-| e1293f2ed2a5427a-982301c462586043 | 1699255138730 | 1699255139823 |
1699255139823 | 1093 | FINISHED | demo | select* from queries() |
localhost |
-| 46fa3ad0e7814ebd-b1cd34940a29b1e9 | 1699255143588 | -1 |
1699255143588 | 20 | RUNNING | demo | select* from queries() |
localhost |
-+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+
-2 rows in set (0.04 sec)
-```
-
-### keywords
-
- queries
diff --git a/docs/sidebars.json b/docs/sidebars.json
index 5d6a34e2a0a..42b442c597e 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -771,7 +771,7 @@
"sql-manual/sql-functions/table-functions/workload-group",
"sql-manual/sql-functions/table-functions/catalogs",
"sql-manual/sql-functions/table-functions/frontends_disks",
-
"sql-manual/sql-functions/table-functions/queries",
+
"sql-manual/sql-functions/table-functions/active_queries",
"sql-manual/sql-functions/table-functions/jobs",
"sql-manual/sql-functions/table-functions/mv_infos",
"sql-manual/sql-functions/table-functions/tasks"
diff --git
a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
new file mode 100644
index 00000000000..bdae08285f2
--- /dev/null
+++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
@@ -0,0 +1,83 @@
+---
+{
+ "title": "ACTIVE_QUERIES",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## `active_queries`
+
+### Name
+
+<version since="dev">
+
+active_queries
+
+</version>
+
+### description
+
+表函数,生成active_queries临时表,可以查看当前doris集群中正在运行的 query 信息。
+
+该函数用于from子句中。
+
+#### syntax
+`active_queries()`
+
+active_queries()表结构:
+```
+mysql [(none)]> desc function active_queries();
++------------------------+--------+------+-------+---------+-------+
+| Field | Type | Null | Key | Default | Extra |
++------------------------+--------+------+-------+---------+-------+
+| BeHost | TEXT | No | false | NULL | NONE |
+| BePort | BIGINT | No | false | NULL | NONE |
+| QueryId | TEXT | No | false | NULL | NONE |
+| StartTime | TEXT | No | false | NULL | NONE |
+| QueryTimeMs | BIGINT | No | false | NULL | NONE |
+| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
+| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE |
+| ScanRows | BIGINT | No | false | NULL | NONE |
+| ScanBytes | BIGINT | No | false | NULL | NONE |
+| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE |
+| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE |
+| Database | TEXT | No | false | NULL | NONE |
+| FrontendInstance | TEXT | No | false | NULL | NONE |
+| Sql | TEXT | No | false | NULL | NONE |
++------------------------+--------+------+-------+---------+-------+
+14 rows in set (0.00 sec)
+```
+
+### example
+```
+mysql [(none)]>select * from active_queries();
++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
+| BeHost | BePort | QueryId | StartTime
| QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes |
BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql
|
++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
+| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15
| 7260 | 10002 | 8392 | 16082249 | 4941889536 |
360470040 | 360420915 | hits | localhost | SELECT xxxx |
++------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
+1 row in set (0.01 sec)
+```
+
+### keywords
+
+ active_queries
diff --git
a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/queries.md
b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/queries.md
deleted file mode 100644
index e3f22da7ad5..00000000000
--- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/queries.md
+++ /dev/null
@@ -1,79 +0,0 @@
----
-{
- "title": "QUERIES",
- "language": "zh-CN"
-}
----
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-## `queries`
-
-### Name
-
-<version since="dev">
-
-queries
-
-</version>
-
-### description
-
-表函数,生成queries临时表,可以查看当前doris集群中正在运行的以及历史的 query 信息。
-
-该函数用于from子句中。
-
-#### syntax
-`queries()`
-
-queries()表结构:
-```
-mysql> desc function queries();
-+------------------+--------+------+-------+---------+-------+
-| Field | Type | Null | Key | Default | Extra |
-+------------------+--------+------+-------+---------+-------+
-| QueryId | TEXT | No | false | NULL | NONE |
-| StartTime | BIGINT | No | false | NULL | NONE |
-| EndTime | BIGINT | No | false | NULL | NONE |
-| EventTime | BIGINT | No | false | NULL | NONE |
-| Latency | BIGINT | No | false | NULL | NONE |
-| State | TEXT | No | false | NULL | NONE |
-| Database | TEXT | No | false | NULL | NONE |
-| Sql | TEXT | No | false | NULL | NONE |
-| FrontendInstance | TEXT | No | false | NULL | NONE |
-+------------------+--------+------+-------+---------+-------+
-9 rows in set (0.00 sec)
-```
-
-### example
-```
-mysql> select* from queries();
-+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+
-| QueryId | StartTime | EndTime |
EventTime | Latency | State | Database | Sql |
FrontendInstance |
-+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+
-| e1293f2ed2a5427a-982301c462586043 | 1699255138730 | 1699255139823 |
1699255139823 | 1093 | FINISHED | demo | select* from queries() |
localhost |
-| 46fa3ad0e7814ebd-b1cd34940a29b1e9 | 1699255143588 | -1 |
1699255143588 | 20 | RUNNING | demo | select* from queries() |
localhost |
-+-----------------------------------+---------------+---------------+---------------+---------+----------+----------+------------------------+------------------+
-2 rows in set (0.04 sec)
-```
-
-### keywords
-
- queries
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index d1b63fe6237..b45847088dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -17,6 +17,7 @@
package org.apache.doris.catalog;
+import
org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
@@ -29,7 +30,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
-import org.apache.doris.nereids.trees.expressions.functions.table.Queries;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
import
org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups;
@@ -54,7 +54,7 @@ public class BuiltinTableValuedFunctions implements
FunctionHelper {
tableValued(Hdfs.class, "hdfs"),
tableValued(HttpStream.class, "http_stream"),
tableValued(Numbers.class, "numbers"),
- tableValued(Queries.class, "queries"),
+ tableValued(ActiveQueries.class, "active_queries"),
tableValued(S3.class, "s3"),
tableValued(MvInfos.class, "mv_infos"),
tableValued(Jobs.class, "jobs"),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/QueryDetailAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/QueryDetailAction.java
deleted file mode 100644
index 508f8aade70..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/QueryDetailAction.java
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.httpv2.rest;
-
-import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.QueryDetail;
-import org.apache.doris.qe.QueryDetailQueue;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.util.List;
-import java.util.Map;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-// This class is used to get current query_id of connection_id.
-// Every connection holds at most one query at every point.
-// Some we can get query_id firstly, and get query by query_id.
-@RestController
-public class QueryDetailAction extends RestBaseController {
- private static final Logger LOG =
LogManager.getLogger(QueryDetailAction.class);
-
- @RequestMapping(path = "/api/query_detail", method = RequestMethod.GET)
- protected Object query_detail(HttpServletRequest request,
HttpServletResponse response) {
- executeCheckPassword(request, response);
- checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
-
- String eventTimeStr = request.getParameter("event_time");
- if (Strings.isNullOrEmpty(eventTimeStr)) {
- return ResponseEntityBuilder.badRequest("Missing event_time");
- }
-
- long eventTime = Long.valueOf(eventTimeStr.trim());
- List<QueryDetail> queryDetails =
QueryDetailQueue.getQueryDetails(eventTime);
-
- Map<String, List<QueryDetail>> result = Maps.newHashMap();
- result.put("query_details", queryDetails);
- return ResponseEntityBuilder.ok(result);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Queries.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
similarity index 87%
rename from
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Queries.java
rename to
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
index dd90853bf7c..f8dcaa4a7ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Queries.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/ActiveQueries.java
@@ -22,7 +22,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
-import org.apache.doris.tablefunction.QueriesTableValuedFunction;
+import org.apache.doris.tablefunction.ActiveQueriesTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
@@ -30,10 +30,10 @@ import java.util.Map;
/**
* queries tvf
*/
-public class Queries extends TableValuedFunction {
+public class ActiveQueries extends TableValuedFunction {
- public Queries(Properties properties) {
- super("queries", properties);
+ public ActiveQueries(Properties properties) {
+ super("active_queries", properties);
}
@Override
@@ -45,7 +45,7 @@ public class Queries extends TableValuedFunction {
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
- return new QueriesTableValuedFunction(arguments);
+ return new ActiveQueriesTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build
FrontendsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index fe07097bc81..fba34d48168 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.expressions.visitor;
+import
org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
@@ -29,7 +30,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
-import org.apache.doris.nereids.trees.expressions.functions.table.Queries;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
@@ -91,7 +91,7 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(numbers, context);
}
- default R visitQueries(Queries queries, C context) {
+ default R visitQueries(ActiveQueries queries, C context) {
return visitTableValuedFunction(queries, context);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 7b6e86ca3a4..9311b4ca8e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -89,18 +89,6 @@ public class AuditLogHelper {
}
}
auditEventBuilder.setIsQuery(true);
- if (ctx.getQueryDetail() != null) {
- ctx.getQueryDetail().setEventTime(endTime);
- ctx.getQueryDetail().setEndTime(endTime);
- ctx.getQueryDetail().setLatency(elapseMs);
- if (ctx.isKilled()) {
-
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.CANCELLED);
- } else {
-
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED);
- }
- QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail());
- ctx.setQueryDetail(null);
- }
} else {
auditEventBuilder.setIsQuery(false);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
index a2a23488cf4..44999ecef64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
@@ -18,6 +18,7 @@
package org.apache.doris.qe;
import org.apache.doris.common.UserException;
+import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
@@ -45,4 +46,6 @@ public interface QeProcessor {
Coordinator getCoordinator(TUniqueId queryId);
List<Coordinator> getAllCoordinators();
+
+ Map<String, QueryInfo> getQueryInfoMap();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index b6d902b76cc..97a0a95d24e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -40,6 +40,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -246,6 +247,18 @@ public final class QeProcessorImpl implements QeProcessor {
return "";
}
+ public Map<String, QueryInfo> getQueryInfoMap() {
+ Map<String, QueryInfo> retQueryInfoMap = Maps.newHashMap();
+ Set<TUniqueId> queryIdSet = coordinatorMap.keySet();
+ for (TUniqueId qid : queryIdSet) {
+ QueryInfo queryInfo = coordinatorMap.get(qid);
+ if (queryInfo != null) {
+ retQueryInfoMap.put(DebugUtil.printId(qid), queryInfo);
+ }
+ }
+ return retQueryInfoMap;
+ }
+
public static final class QueryInfo {
private final ConnectContext connectContext;
private final Coordinator coord;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 9c234698865..767d03f43d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1447,15 +1447,6 @@ public class StmtExecutor {
Queriable queryStmt = (Queriable) parsedStmt;
- QueryDetail queryDetail = new QueryDetail(context.getStartTime(),
- DebugUtil.printId(context.queryId()),
- context.getStartTime(), -1, -1,
- QueryDetail.QueryMemState.RUNNING,
- context.getDatabase(),
- originStmt.originStmt);
- context.setQueryDetail(queryDetail);
- QueryDetailQueue.addOrUpdateQueryDetail(queryDetail);
-
if (queryStmt.isExplain()) {
String explainString =
planner.getExplainString(queryStmt.getExplainOptions());
handleExplainStmt(explainString, false);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 7fac98ca73a..3c5d7fc8bf1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -167,6 +167,10 @@ public class WorkloadRuntimeStatusMgr {
return retQueryMap;
}
+ public Map<Long, Map<String, TQueryStatistics>> getBeQueryStatsMap() {
+ return beToQueryStatsMap;
+ }
+
private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics
src) {
dst.scan_rows += src.scan_rows;
dst.scan_bytes += src.scan_bytes;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
similarity index 71%
rename from
fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java
rename to
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
index e6004cfb62c..0839ae56a67 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueriesTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
@@ -31,19 +31,24 @@ import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
-public class QueriesTableValuedFunction extends MetadataTableValuedFunction {
- public static final String NAME = "queries";
+public class ActiveQueriesTableValuedFunction extends
MetadataTableValuedFunction {
+ public static final String NAME = "active_queries";
private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
- new Column("QueryId", ScalarType.createStringType()),
- new Column("StartTime", PrimitiveType.BIGINT),
- new Column("EndTime", PrimitiveType.BIGINT),
- new Column("EventTime", PrimitiveType.BIGINT),
- new Column("Latency", PrimitiveType.BIGINT),
- new Column("State", ScalarType.createStringType()),
- new Column("Database", ScalarType.createStringType()),
- new Column("Sql", ScalarType.createStringType()),
- new Column("FrontendInstance", ScalarType.createStringType()));
+ new Column("BeHost", ScalarType.createStringType()),
+ new Column("BePort", PrimitiveType.BIGINT),
+ new Column("QueryId", ScalarType.createStringType()),
+ new Column("StartTime", ScalarType.createStringType()),
+ new Column("QueryTimeMs", PrimitiveType.BIGINT),
+ new Column("WorkloadGroupId", PrimitiveType.BIGINT),
+ new Column("QueryCpuTimeMs", PrimitiveType.BIGINT),
+ new Column("ScanRows", PrimitiveType.BIGINT),
+ new Column("ScanBytes", PrimitiveType.BIGINT),
+ new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
+ new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
+ new Column("Database", ScalarType.createStringType()),
+ new Column("FrontendInstance", ScalarType.createStringType()),
+ new Column("Sql", ScalarType.createStringType()));
private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -59,7 +64,7 @@ public class QueriesTableValuedFunction extends
MetadataTableValuedFunction {
return COLUMN_TO_INDEX.get(columnName.toLowerCase());
}
- public QueriesTableValuedFunction(Map<String, String> params) throws
AnalysisException {
+ public ActiveQueriesTableValuedFunction(Map<String, String> params) throws
AnalysisException {
if (params.size() != 0) {
throw new AnalysisException("Queries table-valued-function does
not support any params");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 2d0c7ec66e4..e4768660698 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -37,8 +37,8 @@ import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.QueryDetail;
-import org.apache.doris.qe.QueryDetailQueue;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
@@ -54,6 +54,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueriesMetadataParams;
+import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
@@ -70,11 +71,14 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.jetbrains.annotations.NotNull;
+import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
public class MetadataGenerator {
@@ -414,6 +418,49 @@ public class MetadataGenerator {
return result;
}
+ private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String
queryId, Backend be,
+ String selfNode, QueryInfo queryInfo, TQueryStatistics qs) {
+ TRow trow = new TRow();
+ if (be != null) {
+ trow.addToColumnValue(new TCell().setStringVal(be.getHost()));
+ trow.addToColumnValue(new TCell().setLongVal(be.getBePort()));
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal("invalid host"));
+ trow.addToColumnValue(new TCell().setLongVal(-1));
+ }
+ trow.addToColumnValue(new TCell().setStringVal(queryId));
+
+ String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
+ trow.addToColumnValue(new TCell().setStringVal(strDate));
+ trow.addToColumnValue(new
TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
+
+ if (qs != null) {
+ trow.addToColumnValue(new
TCell().setLongVal(qs.workload_group_id));
+ trow.addToColumnValue(new TCell().setLongVal(qs.cpu_ms));
+ trow.addToColumnValue(new TCell().setLongVal(qs.scan_rows));
+ trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes));
+ trow.addToColumnValue(new
TCell().setLongVal(qs.max_peak_memory_bytes));
+ trow.addToColumnValue(new
TCell().setLongVal(qs.current_used_memory_bytes));
+ } else {
+ trow.addToColumnValue(new TCell().setLongVal(0L));
+ trow.addToColumnValue(new TCell().setLongVal(0L));
+ trow.addToColumnValue(new TCell().setLongVal(0L));
+ trow.addToColumnValue(new TCell().setLongVal(0L));
+ trow.addToColumnValue(new TCell().setLongVal(0L));
+ trow.addToColumnValue(new TCell().setLongVal(0L));
+ }
+
+ if (queryInfo.getConnectContext() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal(""));
+ }
+ trow.addToColumnValue(new TCell().setStringVal(selfNode));
+ trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
+
+ return trow;
+ }
+
private static TFetchSchemaTableDataResult
queriesMetadataResult(TMetadataTableRequestParams params,
TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
@@ -429,24 +476,37 @@ public class MetadataGenerator {
}
selfNode = NetUtils.getHostnameByIp(selfNode);
+ // get query
+ Map<Long, Map<String, TQueryStatistics>> beQsMap =
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr()
+ .getBeQueryStatsMap();
+ Set<Long> beIdSet = beQsMap.keySet();
+
List<TRow> dataBatch = Lists.newArrayList();
- List<QueryDetail> queries = QueryDetailQueue.getQueryDetails(0L);
- for (QueryDetail query : queries) {
- TRow trow = new TRow();
- trow.addToColumnValue(new
TCell().setStringVal(query.getQueryId()));
- trow.addToColumnValue(new
TCell().setLongVal(query.getStartTime()));
- trow.addToColumnValue(new TCell().setLongVal(query.getEndTime()));
- trow.addToColumnValue(new
TCell().setLongVal(query.getEventTime()));
- if (query.getState() == QueryDetail.QueryMemState.RUNNING) {
- trow.addToColumnValue(new
TCell().setLongVal(System.currentTimeMillis() - query.getStartTime()));
- } else {
- trow.addToColumnValue(new
TCell().setLongVal(query.getLatency()));
+ Map<String, QueryInfo> queryInfoMap =
QeProcessorImpl.INSTANCE.getQueryInfoMap();
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ for (Long beId : beIdSet) {
+ Map<String, TQueryStatistics> qsMap = beQsMap.get(beId);
+ if (qsMap == null) {
+ continue;
+ }
+ Set<String> queryIdSet = qsMap.keySet();
+ for (String queryId : queryIdSet) {
+ QueryInfo queryInfo = queryInfoMap.get(queryId);
+ if (queryInfo == null) {
+ continue;
+ }
+ //todo(wb) add connect context for insert select
+ if (queryInfo.getConnectContext() != null &&
!Env.getCurrentEnv().getAccessManager()
+ .checkDbPriv(queryInfo.getConnectContext(),
queryInfo.getConnectContext().getDatabase(),
+ PrivPredicate.SELECT)) {
+ continue;
+ }
+ TQueryStatistics qs = qsMap.get(queryId);
+ Backend be =
Env.getCurrentEnv().getClusterInfo().getBackend(beId);
+ TRow tRow = makeQueryStatisticsTRow(sdf, queryId, be,
selfNode, queryInfo, qs);
+ dataBatch.add(tRow);
}
- trow.addToColumnValue(new
TCell().setStringVal(query.getState().toString()));
- trow.addToColumnValue(new
TCell().setStringVal(query.getDatabase()));
- trow.addToColumnValue(new TCell().setStringVal(query.getSql()));
- trow.addToColumnValue(new TCell().setStringVal(selfNode));
- dataBatch.add(trow);
}
/* Get the query results from other FE also */
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 53a0b7ee5b8..b56aabc504e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -50,7 +50,7 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
case TASKS:
return
TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case QUERIES:
- return
QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
+ return
ActiveQueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
case WORKLOAD_SCHED_POLICY:
return
WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName);
default:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index c9547c91bd2..f9fb76a9666 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -74,8 +74,8 @@ public abstract class TableValuedFunctionIf {
return new TasksTableValuedFunction(params);
case GroupCommitTableValuedFunction.NAME:
return new GroupCommitTableValuedFunction(params);
- case QueriesTableValuedFunction.NAME:
- return new QueriesTableValuedFunction(params);
+ case ActiveQueriesTableValuedFunction.NAME:
+ return new ActiveQueriesTableValuedFunction(params);
case WorkloadSchedPolicyTableValuedFunction.NAME:
return new WorkloadSchedPolicyTableValuedFunction(params);
default:
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 615f86ca9a0..7d1c8c62ae9 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -404,6 +404,8 @@ struct TQueryStatistics {
3: optional i64 returned_rows
4: optional i64 cpu_ms
5: optional i64 max_peak_memory_bytes
+ 6: optional i64 current_used_memory_bytes
+ 7: optional i64 workload_group_id
}
struct TReportWorkloadRuntimeStatusParams {
diff --git
a/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy
b/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy
index 7594b4bd7b5..eaf30402e30 100644
---
a/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy
+++
b/regression-test/suites/external_table_p0/tvf/queries/test_queries_tvf.groovy
@@ -31,7 +31,7 @@ suite("test_queries_tvf","p0,external,tvf,external_docker") {
sql """select * from ${table_name};"""
- def res = sql """ select QueryId from queries() where `Sql` like
"%${table_name}%"; """
+ def res = sql """ select QueryId from active_queries() where `Sql` like
"%${table_name}%"; """
logger.info("res = " + res)
- assertEquals(2, res.size())
+ assertTrue(res.size() >= 0 && res.size() <= 2);
}
\ No newline at end of file
diff --git a/regression-test/suites/nereids_function_p0/tvf/tvf.groovy
b/regression-test/suites/nereids_function_p0/tvf/tvf.groovy
index e99b9ada722..ae188637ec9 100644
--- a/regression-test/suites/nereids_function_p0/tvf/tvf.groovy
+++ b/regression-test/suites/nereids_function_p0/tvf/tvf.groovy
@@ -30,7 +30,7 @@ suite("nereids_tvf") {
"""
sql """
- select QueryId from queries() where `Sql` like "%test_queries_tvf%";
+ select QueryId from active_queries() where `Sql` like
"%test_queries_tvf%";
"""
sql """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]