This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 2a5147b1262 branch-4.1:[improvement](executor) unify current query
runtime statistics and expose task progress (#60567) (#63130)
2a5147b1262 is described below
commit 2a5147b1262c60b79fb32facd4b78a92379c3577
Author: Wen Zhenghu <[email protected]>
AuthorDate: Mon May 11 20:58:23 2026 +0800
branch-4.1:[improvement](executor) unify current query runtime statistics
and expose task progress (#60567) (#63130)
checkpick from https://github.com/apache/doris/pull/60567
**PR Summary**
- This PR unifies current-query runtime statistics onto the `BE -> FE`
reporting pipeline, replacing the previous ad-hoc `RuntimeProfile`
traversal path, and enriches `current_queries` with task-level progress
plus broader resource metrics.
- The goal is to make current-query visibility more real-time and
consistent with audit statistics while simplifying and consolidating FE
proc/REST surfaces.
**What It Solves**
- Unifies statistics source: `QeProcessorImpl` now reads aggregated
`TQueryStatistics` from `WorkloadRuntimeStatusMgr` instead of relying on
the legacy `CurrentQueryInfoProvider` path.
- Improves progress observability: introduces `process_rows`,
`total_tasks_num`, and `finished_tasks_num`, and exposes computed
`Progress`.
- Expands runtime metrics coverage: `current_queries` now includes
richer scan/cpu/memory/shuffle/spill/cache counters.
- Consolidates query views: `/current_queries` and
`/current_query_stmts` now share the same statistics view; legacy
per-query/per-fragment proc drill-down implementation is removed.
**Implementation Details**
- Protocol layer:
- Extends `TQueryStatistics` with `process_rows`, `finished_tasks_num`,
and `total_tasks_num`.
- BE collection/reporting:
- Accumulates `process_rows` in the execution path.
- Records `total_tasks_num` at pipeline task graph initialization and
increments `finished_tasks_num` in real time when tasks close.
- Mirrors task-progress counters into `QueryTaskController` so counters
remain available even after `QueryContext` teardown.
- Exports new fields in `ResourceContext::to_thrift_query_statistics`.
- FE aggregation/retention:
- `WorkloadRuntimeStatusMgr` merges additional fields (including task
progress) and refines timeout cleanup: remove query stats only when they
are timed out and the query no longer exists in FE.
- `QueryStatisticsItem` now carries `TQueryStatistics` as the unified
data carrier for proc/REST.
- Presentation layer:
- `CurrentQueryStatisticsProcDir` adds expanded columns and computes
`Progress`.
- `/rest/v2/manager/query/current_queries` in `QueryProfileAction` now
serves the same unified stats view.
- Removes legacy classes: `CurrentQueryInfoProvider`,
`CurrentQuerySqlProcDir`, `CurrentQueryFragmentProcNode`, and
`CurrentQueryStatementsProcNode`.
```
*************************** 1. row ***************************
QueryId: e00b00b1155d4042-98862b60016a768a
ConnectionId: 394
Catalog: internal
Database: wzhtest
User: root
ExecTime: 20717
SqlHash: cf263b08302d8be436c97dd5e6f0d283
Statement: INSERT INTO test_query_progress_tb SELECT
DISTINCT k, CONCAT(v, CAST(k AS STRING)) FROM test_query_progress_tb WHERE
k % 2 = 0
ScanRows: 45400000 Rows
ScanBytes: 2.70 GB
ProcessRows: 75598123 Rows
CpuMs: 178336
MaxPeakMemoryBytes: 13.03 GB
CurrentUsedMemoryBytes: 8.69 GB
WorkloadGroupId: 1777125330381
ShuffleSendBytes: 0.00
ShuffleSendRows: 0 Rows
ScanBytesFromLocalStorage: 31.48 MB
ScanBytesFromRemoteStorage: 0.00
SpillWriteBytesToLocalStorage: 0.00
SpillReadBytesFromLocalStorage: 0.00
BytesWriteIntoCache: 0.00
TotalTasks: 74
FinishedTasks: 51
Progress: 68%
------------------------
-- first--
QueryId: e2b8c99658a94743-9ebbf0d036d83295
ConnectionId: 9
Catalog: hive_test
Database: tpch100_parquet
User: root
ExecTime: 6093
SqlHash: f8a30e4182d72cce3eff6cb385005b1f
Statement: select ... from supplier, lineitem l1, orders, nation ...
limit 100
ScanRows: 621466194 Rows
ScanBytes: 5.37 GB
ProcessRows: 79079742 Rows
CpuMs: 31655
MaxPeakMemoryBytes: 2.32 GB
CurrentUsedMemoryBytes: 2.18 GB
WorkloadGroupId: 1777253545394
ShuffleSendBytes: 0.00
ShuffleSendRows: 0 Rows
ScanBytesFromLocalStorage: 0.00
ScanBytesFromRemoteStorage: 5.37 GB
SpillWriteBytesToLocalStorage: 0.00
SpillReadBytesFromLocalStorage: 0.00
BytesWriteIntoCache: 0.00
TotalTasks: 138
FinishedTasks: 49
Progress: 35%
--second--
QueryId: e2b8c99658a94743-9ebbf0d036d83295
ConnectionId: 9
Catalog: hive_test
Database: tpch100_parquet
User: root
ExecTime: 10807
SqlHash: f8a30e4182d72cce3eff6cb385005b1f
Statement: select ... from supplier, lineitem l1, orders, nation ...
limit 100
ScanRows: 1102562592 Rows
ScanBytes: 9.20 GB
ProcessRows: 112176670 Rows
CpuMs: 53808
MaxPeakMemoryBytes: 3.13 GB
CurrentUsedMemoryBytes: 2.50 GB
WorkloadGroupId: 1777253545394
ShuffleSendBytes: 0.00
ShuffleSendRows: 0 Rows
ScanBytesFromLocalStorage: 0.00
ScanBytesFromRemoteStorage: 9.20 GB
SpillWriteBytesToLocalStorage: 0.00
SpillReadBytesFromLocalStorage: 0.00
BytesWriteIntoCache: 0.00
TotalTasks: 138
FinishedTasks: 65
Progress: 47%
```
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
Co-authored-by: yiguolei <[email protected]>
Co-authored-by: xuchenhao <[email protected]>
Co-authored-by: xuchenhao <[email protected]>
---
be/src/exec/operator/operator.cpp | 1 +
be/src/exec/pipeline/pipeline_fragment_context.cpp | 4 +
be/src/exec/pipeline/pipeline_fragment_context.h | 5 +-
be/src/runtime/query_context.cpp | 12 +
be/src/runtime/query_context.h | 6 +
be/src/runtime/workload_management/io_context.h | 4 +
.../workload_management/query_task_controller.cpp | 18 ++
.../workload_management/query_task_controller.h | 10 +
.../workload_management/resource_context.cpp | 8 +
be/test/exec/pipeline/pipeline_test.cpp | 125 +++++++++
.../common/proc/CurrentQueryFragmentProcNode.java | 89 -------
.../common/proc/CurrentQueryInfoProvider.java | 200 ---------------
.../doris/common/proc/CurrentQuerySqlProcDir.java | 70 -----
.../proc/CurrentQueryStatementsProcNode.java | 71 -----
.../common/proc/CurrentQueryStatisticsProcDir.java | 80 +++---
.../org/apache/doris/common/proc/ProcService.java | 2 +-
.../doris/common/profile/RuntimeProfile.java | 5 -
.../httpv2/rest/manager/QueryProfileAction.java | 10 +-
.../java/org/apache/doris/qe/QeProcessorImpl.java | 5 +
.../org/apache/doris/qe/QueryStatisticsItem.java | 18 ++
.../WorkloadRuntimeStatusMgr.java | 112 ++++++--
.../proc/CurrentQueryStatisticsProcDirTest.java | 89 +++++++
.../WorkloadRuntimeStatusMgrTest.java | 285 +++++++++++++++++++++
gensrc/thrift/FrontendService.thrift | 3 +
24 files changed, 739 insertions(+), 493 deletions(-)
diff --git a/be/src/exec/operator/operator.cpp
b/be/src/exec/operator/operator.cpp
index 3b330550faf..dde91a51039 100644
--- a/be/src/exec/operator/operator.cpp
+++ b/be/src/exec/operator/operator.cpp
@@ -420,6 +420,7 @@ void PipelineXLocalStateBase::reached_limit(Block* block,
bool* eos) {
if (auto rows = block->rows()) {
_num_rows_returned += rows;
+
_state->get_query_ctx()->resource_ctx()->io_context()->update_process_rows(rows);
}
}
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index e04745a853e..c8f83ad0781 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -602,6 +602,8 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
}
_pipeline_parent_map.clear();
_op_id_to_shared_state.clear();
+ // Record task cardinality once when this fragment context finishes task
initialization.
+
_query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
return Status::OK();
}
@@ -1934,6 +1936,8 @@ void
PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
{
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
+ // Update query-level finished task progress in real time.
+ _query_ctx->inc_finished_task_num();
if (_closed_tasks >= _total_tasks) {
need_remove = _close_fragment_instance();
}
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h
b/be/src/exec/pipeline/pipeline_fragment_context.h
index 01306799aa4..c220ea386f6 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.h
+++ b/be/src/exec/pipeline/pipeline_fragment_context.h
@@ -39,6 +39,7 @@
#include "runtime/runtime_state.h"
#include "runtime/task_execution_context.h"
#include "util/stopwatch.hpp"
+#include "util/uid_util.h"
namespace doris {
struct ReportStatusRequest;
@@ -88,11 +89,11 @@ public:
[[nodiscard]] int get_fragment_id() const { return _fragment_id; }
+ void decrement_running_task(PipelineId pipeline_id);
+
uint32_t rec_cte_stage() const { return _rec_cte_stage; }
void set_rec_cte_stage(uint32_t stage) { _rec_cte_stage = stage; }
- void decrement_running_task(PipelineId pipeline_id);
-
Status send_report(bool);
void trigger_report_if_necessary();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index e00a37aadb1..6fb8e41d824 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -555,4 +555,16 @@ Status QueryContext::reset_global_rf(const
google::protobuf::RepeatedField<int32
return Status::OK();
}
+void QueryContext::add_total_task_num(int delta) {
+ if (auto* qtc =
dynamic_cast<QueryTaskController*>(_resource_ctx->task_controller())) {
+ qtc->add_total_task_num(delta);
+ }
+}
+
+void QueryContext::inc_finished_task_num() {
+ if (auto* qtc =
dynamic_cast<QueryTaskController*>(_resource_ctx->task_controller())) {
+ qtc->inc_finished_task_num();
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d5e559890c0..5e2b6babe87 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -47,6 +47,7 @@ namespace doris {
class PipelineFragmentContext;
class PipelineTask;
+class QueryTaskController;
class Dependency;
class RecCTEScanLocalState;
@@ -203,6 +204,10 @@ public:
TUniqueId query_id() const { return _query_id; }
+ // Expose task-level query progress counters for runtime statistics
reporting.
+ void add_total_task_num(int delta);
+ void inc_finished_task_num();
+
ScannerScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
ScannerScheduler* get_remote_scan_scheduler() { return
_remote_scan_task_scheduler; }
@@ -311,6 +316,7 @@ public:
Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>&
filter_ids);
private:
+ // Task-level progress counters for current query.
friend class QueryTaskController;
int _timeout_second;
diff --git a/be/src/runtime/workload_management/io_context.h
b/be/src/runtime/workload_management/io_context.h
index 3c0dd4343fd..1032c148a63 100644
--- a/be/src/runtime/workload_management/io_context.h
+++ b/be/src/runtime/workload_management/io_context.h
@@ -45,6 +45,7 @@ public:
// number rows returned by query.
// only set once by result sink when closing.
RuntimeProfile::Counter* returned_rows_counter_;
+ RuntimeProfile::Counter* process_rows_counter_;
RuntimeProfile::Counter* shuffle_send_bytes_counter_;
RuntimeProfile::Counter* shuffle_send_rows_counter_;
@@ -63,6 +64,7 @@ public:
bytes_write_into_cache_counter_ =
ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows",
TUnit::UNIT);
+ process_rows_counter_ = ADD_COUNTER(profile_, "ProcessRows",
TUnit::UNIT);
shuffle_send_bytes_counter_ = ADD_COUNTER(profile_,
"ShuffleSendBytes", TUnit::BYTES);
shuffle_send_rows_counter_ =
ADD_COUNTER(profile_, "ShuffleSendRowsCounter_",
TUnit::UNIT);
@@ -94,6 +96,7 @@ public:
return stats_.bytes_write_into_cache_counter_->value();
}
int64_t returned_rows() const { return
stats_.returned_rows_counter_->value(); }
+ int64_t process_rows() const { return
stats_.process_rows_counter_->value(); }
int64_t shuffle_send_bytes() const { return
stats_.shuffle_send_bytes_counter_->value(); }
int64_t shuffle_send_rows() const { return
stats_.shuffle_send_rows_counter_->value(); }
@@ -117,6 +120,7 @@ public:
stats_.bytes_write_into_cache_counter_->update(delta);
}
void update_returned_rows(int64_t delta) const {
stats_.returned_rows_counter_->update(delta); }
+ void update_process_rows(int64_t delta) const {
stats_.process_rows_counter_->update(delta); }
void update_shuffle_send_bytes(int64_t delta) const {
stats_.shuffle_send_bytes_counter_->update(delta);
}
diff --git a/be/src/runtime/workload_management/query_task_controller.cpp
b/be/src/runtime/workload_management/query_task_controller.cpp
index 47d6c7c05cd..16e950fa56c 100644
--- a/be/src/runtime/workload_management/query_task_controller.cpp
+++ b/be/src/runtime/workload_management/query_task_controller.cpp
@@ -227,5 +227,23 @@ std::vector<PipelineTask*>
QueryTaskController::get_revocable_tasks() {
return tasks;
}
+void QueryTaskController::add_total_task_num(int delta) {
+ _total_task_num.fetch_add(delta, std::memory_order_relaxed);
+}
+
+void QueryTaskController::inc_finished_task_num() {
+ _finished_task_num.fetch_add(1, std::memory_order_relaxed);
+}
+
+int QueryTaskController::get_total_task_num() const {
+ // Read from controller-owned counters to avoid lifecycle dependency on
QueryContext.
+ return _total_task_num.load(std::memory_order_relaxed);
+}
+
+int QueryTaskController::get_finished_task_num() const {
+ // Read from controller-owned counters to avoid lifecycle dependency on
QueryContext.
+ return _finished_task_num.load(std::memory_order_relaxed);
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/runtime/workload_management/query_task_controller.h
b/be/src/runtime/workload_management/query_task_controller.h
index f10a846c538..0d46196c150 100644
--- a/be/src/runtime/workload_management/query_task_controller.h
+++ b/be/src/runtime/workload_management/query_task_controller.h
@@ -17,6 +17,8 @@
#pragma once
+#include <atomic>
+
#include "common/factory_creator.h"
#include "runtime/workload_management/task_controller.h"
@@ -46,11 +48,19 @@ public:
size_t get_revocable_size() override;
Status revoke_memory() override;
std::vector<PipelineTask*> get_revocable_tasks() override;
+ // Expose task progress counters without leaking full QueryContext.
+ void add_total_task_num(int delta);
+ void inc_finished_task_num();
+ int get_total_task_num() const;
+ int get_finished_task_num() const;
protected:
QueryTaskController(const std::shared_ptr<QueryContext>& query_ctx) :
query_ctx_(query_ctx) {}
const std::weak_ptr<QueryContext> query_ctx_;
+ // Keep task progress counters in controller so they outlive QueryContext
if needed.
+ std::atomic<int> _total_task_num {0};
+ std::atomic<int> _finished_task_num {0};
};
#include "common/compile_check_end.h"
diff --git a/be/src/runtime/workload_management/resource_context.cpp
b/be/src/runtime/workload_management/resource_context.cpp
index c1b0fd2b744..d7a729aa69c 100644
--- a/be/src/runtime/workload_management/resource_context.cpp
+++ b/be/src/runtime/workload_management/resource_context.cpp
@@ -20,6 +20,7 @@
#include <gen_cpp/data.pb.h>
#include <glog/logging.h>
+#include "runtime/workload_management/query_task_controller.h"
#include "util/time.h"
namespace doris {
@@ -31,6 +32,7 @@ void
ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
statistics->__set_scan_bytes(io_context()->scan_bytes());
statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
statistics->__set_returned_rows(io_context()->returned_rows());
+ statistics->__set_process_rows(io_context()->process_rows());
statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes());
statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes());
@@ -50,6 +52,12 @@ void
ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
io_context_->spill_write_bytes_to_local_storage());
statistics->__set_spill_read_bytes_from_local_storage(
io_context_->spill_read_bytes_from_local_storage());
+
+ if (auto* query_task_controller =
dynamic_cast<QueryTaskController*>(task_controller())) {
+ // Fill query task-level progress directly from task controller.
+
statistics->__set_total_tasks_num(query_task_controller->get_total_task_num());
+
statistics->__set_finished_tasks_num(query_task_controller->get_finished_task_num());
+ }
}
#include "common/compile_check_end.h"
diff --git a/be/test/exec/pipeline/pipeline_test.cpp
b/be/test/exec/pipeline/pipeline_test.cpp
index 4150e281a35..d5d0b5028d6 100644
--- a/be/test/exec/pipeline/pipeline_test.cpp
+++ b/be/test/exec/pipeline/pipeline_test.cpp
@@ -17,6 +17,7 @@
#include "exec/pipeline/pipeline.h"
+#include <gen_cpp/FrontendService_types.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
@@ -40,6 +41,8 @@
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
+#include "runtime/workload_management/query_task_controller.h"
+#include "runtime/workload_management/resource_context.h"
namespace doris {
@@ -467,6 +470,36 @@ TEST_F(PipelineTest, HAPPY_PATH) {
downstream_recvr->close();
}
+TEST_F(PipelineTest, QueryTaskProgressCounters) {
+ // Verify task-level counters are updated via QueryContext and exposed by
QueryTaskController.
+ _query_ctx->add_total_task_num(7);
+ _query_ctx->inc_finished_task_num();
+ _query_ctx->inc_finished_task_num();
+ _query_ctx->inc_finished_task_num();
+
+ auto* query_task_controller =
+
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+ ASSERT_NE(query_task_controller, nullptr);
+ EXPECT_EQ(query_task_controller->get_total_task_num(), 7);
+ EXPECT_EQ(query_task_controller->get_finished_task_num(), 3);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressCountersOutliveQueryContext) {
+ // Verify controller-owned counters still work after QueryContext is
destroyed.
+ auto resource_ctx = _query_ctx->resource_ctx();
+ auto* query_task_controller =
+
dynamic_cast<QueryTaskController*>(resource_ctx->task_controller());
+ ASSERT_NE(query_task_controller, nullptr);
+
+ _query_ctx->add_total_task_num(5);
+ _query_ctx->inc_finished_task_num();
+ _query_ctx->inc_finished_task_num();
+
+ _query_ctx.reset();
+ EXPECT_EQ(query_task_controller->get_total_task_num(), 5);
+ EXPECT_EQ(query_task_controller->get_finished_task_num(), 2);
+}
+
TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
_reset();
// Pipeline(ExchangeOperator(id=0, HASH_PARTITIONED) ->
ExchangeSinkOperatorX(id=1, UNPARTITIONED))
@@ -1163,4 +1196,96 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
downstream_recvr->close();
}
+TEST_F(PipelineTest, QueryTaskProgressConcurrentUpdates) {
+ // Verify counters are thread-safe under concurrent updates from multiple
threads.
+ auto resource_ctx = _query_ctx->resource_ctx();
+ auto* ctrl =
dynamic_cast<QueryTaskController*>(resource_ctx->task_controller());
+ ASSERT_NE(ctrl, nullptr);
+
+ _query_ctx->add_total_task_num(400);
+
+ std::vector<std::thread> threads;
+ for (int i = 0; i < 8; i++) {
+ threads.emplace_back([this]() {
+ for (int j = 0; j < 50; j++) {
+ _query_ctx->inc_finished_task_num();
+ }
+ });
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ EXPECT_EQ(ctrl->get_total_task_num(), 400);
+ EXPECT_EQ(ctrl->get_finished_task_num(), 400);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressThriftSerialization) {
+ // Verify progress counters are correctly serialized to Thrift struct.
+ _query_ctx->add_total_task_num(10);
+ _query_ctx->inc_finished_task_num();
+ _query_ctx->inc_finished_task_num();
+ _query_ctx->inc_finished_task_num();
+ _query_ctx->inc_finished_task_num();
+
+ TQueryStatistics tqs;
+ _query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs);
+
+ EXPECT_TRUE(tqs.__isset.total_tasks_num);
+ EXPECT_EQ(tqs.total_tasks_num, 10);
+ EXPECT_TRUE(tqs.__isset.finished_tasks_num);
+ EXPECT_EQ(tqs.finished_tasks_num, 4);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressBoundaryZeroTotal) {
+ // Verify behavior when no tasks have been registered (total = 0).
+ auto* ctrl =
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+ ASSERT_NE(ctrl, nullptr);
+
+ // total = 0, finished = 0
+ EXPECT_EQ(ctrl->get_total_task_num(), 0);
+ EXPECT_EQ(ctrl->get_finished_task_num(), 0);
+
+ // inc_finished with no total should still work without crash
+ _query_ctx->inc_finished_task_num();
+ EXPECT_EQ(ctrl->get_finished_task_num(), 1);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressAllFinished) {
+ // Verify 100% progress when all tasks finish.
+ _query_ctx->add_total_task_num(8);
+ for (int i = 0; i < 8; i++) {
+ _query_ctx->inc_finished_task_num();
+ }
+
+ auto* ctrl =
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+ ASSERT_NE(ctrl, nullptr);
+ EXPECT_EQ(ctrl->get_total_task_num(), 8);
+ EXPECT_EQ(ctrl->get_finished_task_num(), 8);
+
+ // Verify thrift serialization
+ TQueryStatistics tqs;
+ _query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs);
+ EXPECT_EQ(tqs.total_tasks_num, 8);
+ EXPECT_EQ(tqs.finished_tasks_num, 8);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressCountersSurviveReset) {
+ // Verify that after calling _reset(), fresh counters are initialized to
zero.
+ auto* ctrl1 =
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+ ASSERT_NE(ctrl1, nullptr);
+ _query_ctx->add_total_task_num(10);
+ _query_ctx->inc_finished_task_num();
+ _query_ctx->inc_finished_task_num();
+ EXPECT_EQ(ctrl1->get_total_task_num(), 10);
+ EXPECT_EQ(ctrl1->get_finished_task_num(), 2);
+
+ // Reset creates a new QueryContext
+ _reset();
+ auto* ctrl2 =
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+ ASSERT_NE(ctrl2, nullptr);
+ EXPECT_EQ(ctrl2->get_total_task_num(), 0);
+ EXPECT_EQ(ctrl2->get_finished_task_num(), 0);
+}
+
} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
deleted file mode 100644
index 27b7e673d38..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.util.QueryStatisticsFormatter;
-import org.apache.doris.qe.QueryStatisticsItem;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-
-/*
- * show proc "/current_queries/{query_id}/fragments"
- * set variable "set is_report_success = true" to enable "ScanBytes" and
"ProcessRows".
- */
-public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
- private static final Logger LOG =
LogManager.getLogger(CurrentQueryFragmentProcNode.class);
- public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
- .add("FragmentId").add("InstanceId").add("Host")
- .add("ScanBytes").add("ProcessRows").build();
- private QueryStatisticsItem item;
-
- public CurrentQueryFragmentProcNode(QueryStatisticsItem item) {
- this.item = item;
- }
-
- @Override
- public ProcResult fetchResult() throws AnalysisException {
- return requestFragmentExecInfos();
- }
-
- private ProcResult requestFragmentExecInfos() throws AnalysisException {
- final CurrentQueryInfoProvider provider = new
CurrentQueryInfoProvider();
- final Collection<CurrentQueryInfoProvider.InstanceStatistics>
instanceStatisticsCollection
- = provider.getInstanceStatistics(item);
- final List<List<String>> sortedRowData = Lists.newArrayList();
- for (CurrentQueryInfoProvider.InstanceStatistics instanceStatistics :
- instanceStatisticsCollection) {
- final List<String> rowData = Lists.newArrayList();
- rowData.add(instanceStatistics.getFragmentId());
- rowData.add(instanceStatistics.getInstanceId().toString());
- rowData.add(instanceStatistics.getAddress().toString());
- if (item.getIsReportSucc()) {
- rowData.add(QueryStatisticsFormatter.getScanBytes(
- instanceStatistics.getScanBytes()));
- rowData.add(QueryStatisticsFormatter.getRowsReturned(
- instanceStatistics.getRowsReturned()));
- } else {
- rowData.add("N/A");
- rowData.add("N/A");
- }
- sortedRowData.add(rowData);
- }
-
- // sort according to explain's fragment index
- sortedRowData.sort(new Comparator<List<String>>() {
- @Override
- public int compare(List<String> l1, List<String> l2) {
- return l1.get(0).compareTo(l2.get(0));
- }
- });
- final BaseProcResult result = new BaseProcResult();
- result.setNames(TITLE_NAMES.asList());
- result.setRows(sortedRowData);
- return result;
- }
-
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
deleted file mode 100644
index de7247ab3ab..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
+++ /dev/null
@@ -1,200 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.profile.Counter;
-import org.apache.doris.common.profile.RuntimeProfile;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.qe.QueryStatisticsItem;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TUniqueId;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Provide running query's statistics.
- */
-public class CurrentQueryInfoProvider {
- private static final Logger LOG =
LogManager.getLogger(CurrentQueryInfoProvider.class);
-
- public CurrentQueryInfoProvider() {
- }
-
- /**
- * get Counters from Coordinator's RuntimeProfile and return query's
statistics.
- *
- * @param item
- * @return
- * @throws AnalysisException
- */
- public QueryStatistics getQueryStatistics(QueryStatisticsItem item) throws
AnalysisException {
- return new QueryStatistics(item.getQueryProfile());
- }
-
- /**
- *
- * @param items
- * @return
- * @throws AnalysisException
- */
- public Map<String, QueryStatistics>
getQueryStatistics(Collection<QueryStatisticsItem> items) {
- final Map<String, QueryStatistics> queryStatisticsMap =
Maps.newHashMap();
- for (QueryStatisticsItem item : items) {
- queryStatisticsMap.put(item.getQueryId(), new
QueryStatistics(item.getQueryProfile()));
- }
- return queryStatisticsMap;
- }
-
- /**
- * Return query's instances statistics.
- *
- * @param item
- * @return
- * @throws AnalysisException
- */
- public Collection<InstanceStatistics>
getInstanceStatistics(QueryStatisticsItem item) throws AnalysisException {
- final Map<String, RuntimeProfile> instanceProfiles =
collectInstanceProfile(item.getQueryProfile());
- final List<InstanceStatistics> instanceStatisticsList =
Lists.newArrayList();
- for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo :
item.getFragmentInstanceInfos()) {
- final RuntimeProfile instanceProfile
- =
instanceProfiles.get(DebugUtil.printId(instanceInfo.getInstanceId()));
- Preconditions.checkNotNull(instanceProfile);
- final InstanceStatistics Statistics =
- new InstanceStatistics(
- instanceInfo.getFragmentId(),
- instanceInfo.getInstanceId(),
- instanceInfo.getAddress(),
- instanceProfile);
- instanceStatisticsList.add(Statistics);
- }
- return instanceStatisticsList;
- }
-
- /**
- * Profile trees is query profile -> fragment profile -> instance profile
....
- * @param queryProfile
- * @return instanceProfiles
- */
- private Map<String, RuntimeProfile> collectInstanceProfile(RuntimeProfile
queryProfile) {
- final Map<String, RuntimeProfile> instanceProfiles = Maps.newHashMap();
- for (RuntimeProfile fragmentProfile :
queryProfile.getChildMap().values()) {
- for (Map.Entry<String, RuntimeProfile> entry :
fragmentProfile.getChildMap().entrySet()) {
- Preconditions.checkState(instanceProfiles.put(
- parseInstanceId(entry.getKey()), entry.getValue()) ==
null);
- }
- }
- return instanceProfiles;
- }
-
- /**
- * Instance profile key is "Instance ${instance_id} (host=$host $port)"
- * @param str
- * @return
- */
- private String parseInstanceId(String str) {
- final String[] elements = str.split(" ");
- if (elements.length == 4) {
- return elements[1];
- } else {
- Preconditions.checkState(false);
- return "";
- }
- }
-
- public static class QueryStatistics {
- final List<Map<String, Counter>> counterMaps;
-
- public QueryStatistics(RuntimeProfile profile) {
- counterMaps = Lists.newArrayList();
- collectCounters(profile, counterMaps);
- }
-
- private void collectCounters(RuntimeProfile profile,
- List<Map<String, Counter>>
counterMaps) {
- for (Map.Entry<String, RuntimeProfile> entry :
profile.getChildMap().entrySet()) {
- counterMaps.add(entry.getValue().getCounterMap());
- collectCounters(entry.getValue(), counterMaps);
- }
- }
-
- public long getScanBytes() {
- long scanBytes = 0;
- for (Map<String, Counter> counters : counterMaps) {
- final Counter counter = counters.get("CompressedBytesRead");
- scanBytes += counter == null ? 0 : counter.getValue();
- }
- return scanBytes;
- }
-
- public long getRowsReturned() {
- long rowsReturned = 0;
- for (Map<String, Counter> counters : counterMaps) {
- final Counter counter = counters.get("RowsReturned");
- rowsReturned += counter == null ? 0 : counter.getValue();
- }
- return rowsReturned;
- }
- }
-
- public static class InstanceStatistics {
- private final String fragmentId;
- private final TUniqueId instanceId;
- private final TNetworkAddress address;
- private final QueryStatistics statistics;
-
- public InstanceStatistics(
- String fragmentId,
- TUniqueId instanceId,
- TNetworkAddress address,
- RuntimeProfile profile) {
- this.fragmentId = fragmentId;
- this.instanceId = instanceId;
- this.address = address;
- this.statistics = new QueryStatistics(profile);
- }
-
- public String getFragmentId() {
- return fragmentId;
- }
-
- public TUniqueId getInstanceId() {
- return instanceId;
- }
-
- public TNetworkAddress getAddress() {
- return address;
- }
-
- public long getRowsReturned() {
- return statistics.getRowsReturned();
- }
-
- public long getScanBytes() {
- return statistics.getScanBytes();
- }
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java
deleted file mode 100644
index 2d21b656ef3..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.qe.QueryStatisticsItem;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-/*
- * show proc "/current_queries/{query_id}"
- */
-public class CurrentQuerySqlProcDir implements ProcDirInterface {
-
- public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
- .add("Sql").build();
-
- private final QueryStatisticsItem item;
-
- public CurrentQuerySqlProcDir(QueryStatisticsItem item) {
- this.item = item;
- }
-
- @Override
- public boolean register(String name, ProcNodeInterface node) {
- return false;
- }
-
- @Override
- public ProcNodeInterface lookup(String name) throws AnalysisException {
- if (Strings.isNullOrEmpty(name)) {
- return null;
- }
-
- if (!name.equals("fragments")) {
- throw new AnalysisException(name + " doesn't exist.");
- }
-
- return new CurrentQueryFragmentProcNode(item);
- }
-
- @Override
- public ProcResult fetchResult() throws AnalysisException {
- final BaseProcResult result = new BaseProcResult();
- result.setNames(TITLE_NAMES.asList());
- final List<String> values = Lists.newArrayList();
- values.add(item.getSql());
- result.addRow(values);
- return result;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java
deleted file mode 100644
index 746726c9051..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.qe.QeProcessorImpl;
-import org.apache.doris.qe.QueryStatisticsItem;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.commons.codec.digest.DigestUtils;
-
-import java.util.List;
-import java.util.Map;
-
-/*
- * show proc "/current_query_stmts"
- */
-public class CurrentQueryStatementsProcNode implements ProcNodeInterface {
- public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
-
.add("QueryId").add("ConnectionId").add("Catalog").add("Database").add("User")
- .add("ExecTime").add("SqlHash").add("Statement").build();
-
- private static final int EXEC_TIME_INDEX = 5;
-
- @Override
- public ProcResult fetchResult() throws AnalysisException {
- final BaseProcResult result = new BaseProcResult();
- final Map<String, QueryStatisticsItem> statistic =
- QeProcessorImpl.INSTANCE.getQueryStatistics();
- result.setNames(TITLE_NAMES.asList());
- final List<List<String>> sortedRowData = Lists.newArrayList();
-
- for (QueryStatisticsItem item : statistic.values()) {
- final List<String> values = Lists.newArrayList();
- values.add(item.getQueryId());
- values.add(item.getConnId());
- values.add(item.getCatalog());
- values.add(item.getDb());
- values.add(item.getUser());
- values.add(item.getQueryExecTime());
- values.add(DigestUtils.md5Hex(item.getSql()));
- values.add(item.getSql());
- sortedRowData.add(values);
- }
-
- // sort according to ExecTime
- sortedRowData.sort((l1, l2) -> {
- final long execTime1 = Long.parseLong(l1.get(EXEC_TIME_INDEX));
- final long execTime2 = Long.parseLong(l2.get(EXEC_TIME_INDEX));
- return Long.compare(execTime2, execTime1);
- });
- result.setRows(sortedRowData);
- return result;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
index 0df36c0040f..88a68964f82 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
@@ -21,24 +21,33 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.QueryStatisticsFormatter;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryStatisticsItem;
+import org.apache.doris.thrift.TQueryStatistics;
-import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import org.apache.commons.codec.digest.DigestUtils;
import java.util.List;
import java.util.Map;
/*
* show proc "/current_queries"
- * only set variable "set is_report_success = true" to enable "ScanBytes" and
"ProcessRows".
+ * the statistics is same as the data in audit log.
*/
public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
+ // ProcessRows temp used for doris manager compatibility, will be
implemented future.
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
.add("QueryId").add("ConnectionId").add("Catalog").add("Database").add("User")
- .add("ScanBytes").add("ProcessRows").add("ExecTime").build();
+ .add("ExecTime").add("SqlHash").add("Statement")
+ .add("ScanRows").add("ScanBytes").add("ProcessRows").add("CpuMs")
+
.add("MaxPeakMemoryBytes").add("CurrentUsedMemoryBytes").add("WorkloadGroupId")
+ .add("ShuffleSendBytes").add("ShuffleSendRows")
+ .add("ScanBytesFromLocalStorage").add("ScanBytesFromRemoteStorage")
+
.add("SpillWriteBytesToLocalStorage").add("SpillReadBytesFromLocalStorage")
+ .add("BytesWriteIntoCache")
+ .add("TotalTasks").add("FinishedTasks").add("Progress").build();
- private static final int EXEC_TIME_INDEX = 7;
+ private static final int EXEC_TIME_INDEX = 5;
@Override
public boolean register(String name, ProcNodeInterface node) {
@@ -47,15 +56,7 @@ public class CurrentQueryStatisticsProcDir implements
ProcDirInterface {
@Override
public ProcNodeInterface lookup(String name) throws AnalysisException {
- if (Strings.isNullOrEmpty(name)) {
- return null;
- }
- final Map<String, QueryStatisticsItem> statistic =
QeProcessorImpl.INSTANCE.getQueryStatistics();
- final QueryStatisticsItem item = statistic.get(name);
- if (item == null) {
- throw new AnalysisException(name + " doesn't exist.");
- }
- return new CurrentQuerySqlProcDir(item);
+ throw new AnalysisException("operation doesn't support.");
}
@Override
@@ -65,32 +66,41 @@ public class CurrentQueryStatisticsProcDir implements
ProcDirInterface {
QeProcessorImpl.INSTANCE.getQueryStatistics();
result.setNames(TITLE_NAMES.asList());
final List<List<String>> sortedRowData = Lists.newArrayList();
-
- final CurrentQueryInfoProvider provider = new
CurrentQueryInfoProvider();
- final Map<String, CurrentQueryInfoProvider.QueryStatistics>
statisticsMap
- = provider.getQueryStatistics(statistic.values());
for (QueryStatisticsItem item : statistic.values()) {
final List<String> values = Lists.newArrayList();
+ final TQueryStatistics queryStatistics = item.getQueryStatistics();
values.add(item.getQueryId());
values.add(item.getConnId());
values.add(item.getCatalog());
values.add(item.getDb());
values.add(item.getUser());
- if (item.getIsReportSucc()) {
- final CurrentQueryInfoProvider.QueryStatistics statistics
- = statisticsMap.get(item.getQueryId());
- values.add(QueryStatisticsFormatter.getScanBytes(
- statistics.getScanBytes()));
- values.add(QueryStatisticsFormatter.getRowsReturned(
- statistics.getRowsReturned()));
- } else {
- values.add("N/A");
- values.add("N/A");
- }
values.add(item.getQueryExecTime());
+ values.add(DigestUtils.md5Hex(item.getSql()));
+ values.add(item.getSql());
+
values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getScanRows()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytes()));
+
values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getProcessRows()));
+ values.add(String.valueOf(queryStatistics.getCpuMs()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getMaxPeakMemoryBytes()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getCurrentUsedMemoryBytes()));
+ values.add(String.valueOf(queryStatistics.getWorkloadGroupId()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getShuffleSendBytes()));
+
values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getShuffleSendRows()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytesFromLocalStorage()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytesFromRemoteStorage()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getSpillWriteBytesToLocalStorage()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getSpillReadBytesFromLocalStorage()));
+
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getBytesWriteIntoCache()));
+
+ long total = queryStatistics.isSetTotalTasksNum() ?
queryStatistics.getTotalTasksNum() : 0;
+ long finished = queryStatistics.isSetFinishedTasksNum() ?
queryStatistics.getFinishedTasksNum() : 0;
+ values.add(String.valueOf(total));
+ values.add(String.valueOf(finished));
+ values.add(formatProgress(total, finished));
+
sortedRowData.add(values);
}
- // sort according to ExecTime
+
sortedRowData.sort((l1, l2) -> {
final long execTime1 = Long.parseLong(l1.get(EXEC_TIME_INDEX));
final long execTime2 = Long.parseLong(l2.get(EXEC_TIME_INDEX));
@@ -99,4 +109,16 @@ public class CurrentQueryStatisticsProcDir implements
ProcDirInterface {
result.setRows(sortedRowData);
return result;
}
+
+ /**
+ * Format task progress as a percentage string with one decimal place.
+ * Visible for testing.
+ */
+ static String formatProgress(long total, long finished) {
+ if (total > 0) {
+ double pct = (double) finished * 100 / total;
+ return String.format("%.1f%%", pct);
+ }
+ return "0.0%";
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
index 42010ccbd20..a1f54901bde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
@@ -49,7 +49,7 @@ public final class ProcService {
root.register("trash", new TrashProcDir());
root.register("monitor", new MonitorProcDir());
root.register("current_queries", new CurrentQueryStatisticsProcDir());
- root.register("current_query_stmts", new
CurrentQueryStatementsProcNode());
+ root.register("current_query_stmts", new
CurrentQueryStatisticsProcDir());
root.register("current_backend_instances", new
CurrentQueryBackendInstanceProcDir());
root.register("cluster_balance", new ClusterBalanceProcDir());
root.register("cluster_health", new ClusterHealthProcDir());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
index 2f499a8dd4c..cb55f350f07 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
@@ -55,11 +55,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-/**
- * It is accessed by two kinds of thread, one is to create this RuntimeProfile
- * , named 'query thread', the other is to call
- * {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}.
- */
public class RuntimeProfile {
// TODO: 这里维护性太差了
// BE 上的 OperatorXBase::init 里面有 Operator 的命名规则
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 5480858efbf..e619a81326e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -23,7 +23,7 @@ import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
-import org.apache.doris.common.proc.CurrentQueryStatementsProcNode;
+import org.apache.doris.common.proc.CurrentQueryStatisticsProcDir;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.profile.ProfileManager;
import org.apache.doris.common.profile.ProfileManager.ProfileElement;
@@ -450,7 +450,7 @@ public class QueryProfileAction extends RestBaseController {
}
/**
- * return the result of CurrentQueryStatementsProcNode.
+ * return the result of CurrentQueryStatisticsProcDir.
*
* @param request
* @param response
@@ -480,15 +480,15 @@ public class QueryProfileAction extends
RestBaseController {
LOG.warn("parse query info error: {}", data, e);
}
}
- List<String> titles =
Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
+ List<String> titles =
Lists.newArrayList(CurrentQueryStatisticsProcDir.TITLE_NAMES);
titles.add(0, FRONTEND);
return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(titles,
queries));
} else {
try {
- CurrentQueryStatementsProcNode node = new
CurrentQueryStatementsProcNode();
+ CurrentQueryStatisticsProcDir node = new
CurrentQueryStatisticsProcDir();
ProcResult result = node.fetchResult();
// add frontend info at first column.
- List<String> titles =
Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
+ List<String> titles =
Lists.newArrayList(CurrentQueryStatisticsProcDir.TITLE_NAMES);
titles.add(0, FRONTEND);
List<List<String>> rows = result.getRows();
String feIp = FrontendOptions.getLocalHostAddress();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 275dc234706..ff023aeb939 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -29,6 +29,7 @@ import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryProfile;
+import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TStatus;
@@ -212,6 +213,8 @@ public final class QeProcessorImpl implements QeProcessor {
@Override
public Map<String, QueryStatisticsItem> getQueryStatistics() {
final Map<String, QueryStatisticsItem> querySet = Maps.newHashMap();
+ final Map<String, TQueryStatistics> queryStatisticsMap =
+
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().getQueryStatisticsMap();
for (Map.Entry<TUniqueId, QueryInfo> entry :
coordinatorMap.entrySet()) {
final QueryInfo info = entry.getValue();
final ConnectContext context = info.getConnectContext();
@@ -219,12 +222,14 @@ public final class QeProcessorImpl implements QeProcessor
{
continue;
}
final String queryIdStr =
DebugUtil.printId(info.getConnectContext().queryId());
+ final TQueryStatistics queryStatistics =
queryStatisticsMap.get(queryIdStr);
final QueryStatisticsItem item = new
QueryStatisticsItem.Builder().queryId(queryIdStr)
.queryStartTime(info.getStartExecTime()).sql(info.getSql()).user(context.getQualifiedUser())
.connId(String.valueOf(context.getConnectionId())).db(context.getDatabase())
.catalog(context.getDefaultCatalog())
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
.profile(info.getCoord().getExecutionProfile().getRoot())
+ .queryStatistics(queryStatistics)
.isReportSucc(context.getSessionVariable().enableProfile()).build();
querySet.put(queryIdStr, item);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
index c51ff24ca14..f879903e65a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
import org.apache.doris.common.profile.RuntimeProfile;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
@@ -38,6 +39,8 @@ public final class QueryStatisticsItem {
// root query profile
private final RuntimeProfile queryProfile;
private final boolean isReportSucc;
+ // query statistics same as statistics in audit log
+ private final TQueryStatistics queryStatistics;
private QueryStatisticsItem(Builder builder) {
this.queryId = builder.queryId;
@@ -50,6 +53,7 @@ public final class QueryStatisticsItem {
this.fragmentInstanceInfos = builder.fragmentInstanceInfos;
this.queryProfile = builder.queryProfile;
this.isReportSucc = builder.isReportSucc;
+ this.queryStatistics = builder.queryStatistics;
}
public String getDb() {
@@ -97,6 +101,10 @@ public final class QueryStatisticsItem {
return isReportSucc;
}
+ public TQueryStatistics getQueryStatistics() {
+ return queryStatistics;
+ }
+
public static final class Builder {
private String queryId;
private String catalog;
@@ -108,6 +116,7 @@ public final class QueryStatisticsItem {
private List<FragmentInstanceInfo> fragmentInstanceInfos;
private RuntimeProfile queryProfile;
private boolean isReportSucc;
+ private TQueryStatistics queryStatistics;
public Builder() {
fragmentInstanceInfos = Lists.newArrayList();
@@ -163,6 +172,11 @@ public final class QueryStatisticsItem {
return this;
}
+ public Builder queryStatistics(TQueryStatistics queryStatistics) {
+ this.queryStatistics = queryStatistics;
+ return this;
+ }
+
public QueryStatisticsItem build() {
initDefaultValue(this);
return new QueryStatisticsItem(this);
@@ -192,6 +206,10 @@ public final class QueryStatisticsItem {
if (queryProfile == null) {
queryProfile = new RuntimeProfile("");
}
+
+ if (queryStatistics == null) {
+ queryStatistics = new TQueryStatistics();
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index cc31e7858e0..7920b1e59a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -20,12 +20,15 @@ package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TQueryStatisticsResult;
import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
@@ -48,7 +51,10 @@ import java.util.concurrent.locks.ReentrantLock;
public class WorkloadRuntimeStatusMgr extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
+ // backend id --> {query id --> (query last report time, query stats)}
private Map<Long, BeReportInfo> beToQueryStatsMap =
Maps.newConcurrentMap();
+ // Publish an immutable snapshot for synchronous proc/REST readers.
+ private volatile Map<String, TQueryStatistics> queryStatisticsSnapshot =
ImmutableMap.of();
private final ReentrantLock queryAuditEventLock = new ReentrantLock();
private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
private volatile long lastWarnTime;
@@ -60,6 +66,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
this.beLastReportTime = beLastReportTime;
}
+ // query id --> (query last report time, query stats)
Map<String, Pair<Long, TQueryStatisticsResult>> queryStatsMap =
Maps.newConcurrentMap();
}
@@ -69,10 +76,12 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- // 1 merge be query statistics
+ // 1 rebuild and publish query statistics snapshot
+ rebuildQueryStatisticsSnapshot();
+ // 2 read the latest immutable snapshot for downstream processing
Map<String, TQueryStatistics> queryStatisticsMap =
getQueryStatisticsMap();
- // 2 log query audit
+ // 3 log query audit
try {
List<AuditEvent> auditEventList = getQueryNeedAudit();
int missedLogCount = 0;
@@ -106,10 +115,17 @@ public class WorkloadRuntimeStatusMgr extends
MasterDaemon {
LOG.warn("exception happens when handleAuditEvent, ", t);
}
- // 3 clear beToQueryStatsMap when be report timeout
+ // 4 clear beToQueryStatsMap when be report timeout
clearReportTimeoutBeStatistics();
}
+ // After the query or insert finished, FE will not audit immediately, it
will send an audit
+ // event to this queue. And the worker thread will handle it. If the queue
is full, the event
+ // will be handled immediately and may miss some statistic info. So the
statistic info of audit
+ // event may be not accurate, but it can avoid the case that FE OOM
because of too many audit
+ // events in queue when QPS is high. The event will be logged directly if
the queue is full.
+ // And the worker thread will get an event from the queue and get the
statistic info for this
+ // event from queryStatisticsMap.
public void submitFinishQueryToAudit(AuditEvent event) {
queryAuditEventLogWriteLock();
try {
@@ -121,9 +137,9 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
// if queryAuditEventList is full, we don't put the event
to queryAuditEventList.
// so that the statistic info of this audit event will be
ignored,
// and event will be logged directly.
- LOG.warn("audit log event queue size {} is full, this may
cause audit log missing statistics."
- + "you can check whether qps is too high
or "
- + "set audit_event_log_queue_size to a
larger value in fe.conf. query id: {}",
+ LOG.warn("audit log event queue size {} is full, this may
cause audit log missing "
+ + "statistics. you can check whether qps is too
high or set "
+ + "audit_event_log_queue_size to a larger value in
fe.conf. query id: {}",
queryAuditEventList.size(), event.queryId);
}
Env.getCurrentAuditEventProcessor().handleAuditEvent(event);
@@ -186,7 +202,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
}
}
- void clearReportTimeoutBeStatistics() {
+ private void clearReportTimeoutBeStatistics() {
// 1 clear report timeout be
Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
Long currentTime = System.currentTimeMillis();
@@ -200,24 +216,59 @@ public class WorkloadRuntimeStatusMgr extends
MasterDaemon {
for (String queryId : queryIdSet) {
Pair<Long, TQueryStatisticsResult> pair =
beReportInfo.queryStatsMap.get(queryId);
long queryLastReportTime = pair.first;
- if (currentTime - queryLastReportTime >
Config.be_report_query_statistics_timeout_ms) {
+ boolean timeout = currentTime - queryLastReportTime
+ > Config.be_report_query_statistics_timeout_ms;
+ // Remove query statistics only when both conditions are
satisfied:
+ // 1) this query statistics is timeout, and
+ // 2) FE no longer has this query in QeProcessorImpl.
+ // Example timeline:
+ // - t0: query q1 is still running, but one periodic BE report
is delayed for > timeout.
+ // - t1: clear thread runs. timeout condition is true, but q1
still exists in FE.
+ // - t2: we keep q1 statistics instead of removing it; later
reports can update it again.
+ if (timeout && isQueryNotExistInFe(queryId)) {
beReportInfo.queryStatsMap.remove(queryId);
}
}
}
}
- // NOTE: currently getQueryStatisticsMap must be called before clear
beToQueryStatsMap
- // so there is no need lock or null check when visit beToQueryStatsMap
+ private boolean isQueryNotExistInFe(String queryId) {
+ try {
+ return
QeProcessorImpl.INSTANCE.getCoordinator(DebugUtil.parseTUniqueIdFromString(queryId))
== null;
+ } catch (NumberFormatException e) {
+ return true;
+ }
+ }
+
+ // Rebuild query statistics from concurrent runtime maps and publish an
immutable snapshot.
+ // This method is intentionally called by daemon thread and unit tests
only.
+ void rebuildQueryStatisticsSnapshot() {
+ queryStatisticsSnapshot =
ImmutableMap.copyOf(buildQueryStatisticsMapUnsafe());
+ }
+
+ // Return the latest published snapshot for synchronous readers such as
proc/REST paths.
public Map<String, TQueryStatistics> getQueryStatisticsMap() {
+ return queryStatisticsSnapshot;
+ }
+
+ // Build a merged map by traversing concurrent runtime structures.
+ private Map<String, TQueryStatistics> buildQueryStatisticsMapUnsafe() {
// 1 merge query stats in all be
Set<Long> beIdSet = beToQueryStatsMap.keySet();
Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
for (Long beId : beIdSet) {
BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+ if (beReportInfo == null) {
+ continue;
+ }
Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
for (String queryId : queryIdSet) {
- TQueryStatisticsResult curQueryStats =
beReportInfo.queryStatsMap.get(queryId).second;
+ Pair<Long, TQueryStatisticsResult> queryStatsPair =
+ beReportInfo.queryStatsMap.get(queryId);
+ if (queryStatsPair == null || queryStatsPair.second == null) {
+ continue;
+ }
+ TQueryStatisticsResult curQueryStats = queryStatsPair.second;
TQueryStatistics retQuery = resultQueryMap.get(queryId);
if (retQuery == null) {
@@ -248,18 +299,37 @@ public class WorkloadRuntimeStatusMgr extends
MasterDaemon {
if (srcStats == null) {
return;
}
- dst.scan_rows += srcStats.scan_rows;
- dst.scan_bytes += srcStats.scan_bytes;
- dst.scan_bytes_from_local_storage +=
srcStats.scan_bytes_from_local_storage;
- dst.scan_bytes_from_remote_storage +=
srcStats.scan_bytes_from_remote_storage;
- dst.cpu_ms += srcStats.cpu_ms;
- dst.shuffle_send_bytes += srcStats.shuffle_send_bytes;
- dst.shuffle_send_rows += srcStats.shuffle_send_rows;
+ dst.setScanRows(dst.scan_rows + srcStats.scan_rows);
+ dst.setScanBytes(dst.scan_bytes + srcStats.scan_bytes);
+ dst.setScanBytesFromLocalStorage(dst.scan_bytes_from_local_storage
+ + srcStats.scan_bytes_from_local_storage);
+ dst.setScanBytesFromRemoteStorage(dst.scan_bytes_from_remote_storage
+ + srcStats.scan_bytes_from_remote_storage);
+ dst.setCpuMs(dst.cpu_ms + srcStats.cpu_ms);
+ dst.setShuffleSendBytes(dst.shuffle_send_bytes +
srcStats.shuffle_send_bytes);
+ dst.setShuffleSendRows(dst.shuffle_send_rows +
srcStats.shuffle_send_rows);
+ dst.setProcessRows(dst.process_rows + srcStats.process_rows);
+ dst.setReturnedRows(dst.returned_rows + srcStats.returned_rows);
+ if (srcStats.isSetTotalTasksNum()) {
+ dst.setTotalTasksNum(dst.total_tasks_num +
srcStats.total_tasks_num);
+ }
+ if (srcStats.isSetFinishedTasksNum()) {
+ dst.setFinishedTasksNum(dst.finished_tasks_num +
srcStats.finished_tasks_num);
+ }
+ if (dst.current_used_memory_bytes <
srcStats.current_used_memory_bytes) {
+ dst.setCurrentUsedMemoryBytes(srcStats.current_used_memory_bytes);
+ }
+ if (dst.workload_group_id <= 0 && srcStats.workload_group_id > 0) {
+ dst.setWorkloadGroupId(srcStats.workload_group_id);
+ }
if (dst.max_peak_memory_bytes < srcStats.max_peak_memory_bytes) {
- dst.max_peak_memory_bytes = srcStats.max_peak_memory_bytes;
+ dst.setMaxPeakMemoryBytes(srcStats.max_peak_memory_bytes);
}
- dst.spill_write_bytes_to_local_storage +=
srcStats.spill_write_bytes_to_local_storage;
- dst.spill_read_bytes_from_local_storage +=
srcStats.spill_read_bytes_from_local_storage;
+
dst.setSpillWriteBytesToLocalStorage(dst.spill_write_bytes_to_local_storage
+ + srcStats.spill_write_bytes_to_local_storage);
+
dst.setSpillReadBytesFromLocalStorage(dst.spill_read_bytes_from_local_storage
+ + srcStats.spill_read_bytes_from_local_storage);
+ dst.setBytesWriteIntoCache(dst.bytes_write_into_cache +
srcStats.bytes_write_into_cache);
}
private void queryAuditEventLogWriteLock() {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java
new file mode 100644
index 00000000000..0c9accd89e5
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for CurrentQueryStatisticsProcDir progress formatting.
+ * Tests the formatProgress method directly — no mocking or reflection
required.
+ */
+public class CurrentQueryStatisticsProcDirTest {
+
+ @Test
+ public void testProgressNormal() {
+ // 7 out of 20 tasks finished = 35.0%
+ Assert.assertEquals("35.0%",
CurrentQueryStatisticsProcDir.formatProgress(20, 7));
+ }
+
+ @Test
+ public void testProgressAllFinished() {
+ // 8 out of 8 = 100.0%
+ Assert.assertEquals("100.0%",
CurrentQueryStatisticsProcDir.formatProgress(8, 8));
+ }
+
+ @Test
+ public void testProgressOneThird() {
+ // 1 out of 3 = 33.3%
+ Assert.assertEquals("33.3%",
CurrentQueryStatisticsProcDir.formatProgress(3, 1));
+ }
+
+ @Test
+ public void testProgressTwoThirds() {
+ // 2 out of 3 = 66.7%
+ Assert.assertEquals("66.7%",
CurrentQueryStatisticsProcDir.formatProgress(3, 2));
+ }
+
+ @Test
+ public void testProgressZeroPercent() {
+ // 0 out of 5 = 0.0%
+ Assert.assertEquals("0.0%",
CurrentQueryStatisticsProcDir.formatProgress(5, 0));
+ }
+
+ @Test
+ public void testProgressZeroTotal() {
+ // total = 0, finished = 0 → "0.0%" (no division by zero)
+ Assert.assertEquals("0.0%",
CurrentQueryStatisticsProcDir.formatProgress(0, 0));
+ }
+
+ @Test
+ public void testProgressFinishedExceedsTotal() {
+ // Defensive: if finished > total, still returns a percentage (may
exceed 100%)
+ Assert.assertEquals("200.0%",
CurrentQueryStatisticsProcDir.formatProgress(5, 10));
+ }
+
+ @Test
+ public void testProgressNegativeTotal() {
+ // total < 0 → returns "0.0%"
+ Assert.assertEquals("0.0%",
CurrentQueryStatisticsProcDir.formatProgress(-1, 5));
+ }
+
+ @Test
+ public void testProgressLargeValues() {
+ // Verify no overflow with large numbers
+ Assert.assertEquals("50.0%",
+
CurrentQueryStatisticsProcDir.formatProgress(Integer.MAX_VALUE,
Integer.MAX_VALUE / 2));
+ }
+
+ @Test
+ public void testProgressFractional() {
+ // 1 out of 7 = 14.3% (14.2857... rounds to 14.3)
+ Assert.assertEquals("14.3%",
CurrentQueryStatisticsProcDir.formatProgress(7, 1));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java
new file mode 100644
index 00000000000..ae17dcc430a
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.workloadschedpolicy;
+
+import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TQueryStatisticsResult;
+import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
+
+import com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * Unit test for WorkloadRuntimeStatusMgr.
+ * Verifies that query progress statistics from multiple BEs are correctly
+ * merged via the Thrift setter-based merge logic.
+ */
+public class WorkloadRuntimeStatusMgrTest {
+
+ private WorkloadRuntimeStatusMgr mgr;
+
+ @Before
+ public void setUp() {
+ mgr = new WorkloadRuntimeStatusMgr();
+ }
+
+ // ---- Merge: single BE ----
+
+ @Test
+ public void testSingleBeProgressMerge() {
+ // Simulate one BE reporting progress for one query.
+ long beId = 10001L;
+ TQueryStatistics stats = new TQueryStatistics();
+ stats.setTotalTasksNum(10);
+ stats.setFinishedTasksNum(3);
+
+ TReportWorkloadRuntimeStatusParams params = buildParams(beId, "q1",
stats);
+ mgr.updateBeQueryStats(params);
+
+ Map<String, TQueryStatistics> merged = getMergedSnapshot();
+ Assert.assertEquals(1, merged.size());
+
+ TQueryStatistics result = merged.get("q1");
+ Assert.assertNotNull(result);
+ Assert.assertEquals(10, result.getTotalTasksNum());
+ Assert.assertEquals(3, result.getFinishedTasksNum());
+ }
+
+ // ---- Merge: multiple BEs, same query (summing across BEs) ----
+
+ @Test
+ public void testMultiBeSummingAcrossQuery() {
+ // BE1: total=10, finished=3
+ // BE2: total=8, finished=5
+ // Merged: total=18, finished=8
+ mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 3)));
+ mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(8, 5)));
+
+ Map<String, TQueryStatistics> merged = getMergedSnapshot();
+ Assert.assertEquals(1, merged.size());
+
+ TQueryStatistics result = merged.get("q1");
+ Assert.assertEquals(18, result.getTotalTasksNum());
+ Assert.assertEquals(8, result.getFinishedTasksNum());
+ }
+
+ // ---- Merge: multiple BEs, multiple queries remain independent ----
+
+ @Test
+ public void testMultiQueryIndependence() {
+ mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 2)));
+ mgr.updateBeQueryStats(buildParams(10001L, "q2", buildStats(20, 15)));
+
+ Map<String, TQueryStatistics> merged = getMergedSnapshot();
+ Assert.assertEquals(2, merged.size());
+
+ Assert.assertEquals(10, merged.get("q1").getTotalTasksNum());
+ Assert.assertEquals(2, merged.get("q1").getFinishedTasksNum());
+ Assert.assertEquals(20, merged.get("q2").getTotalTasksNum());
+ Assert.assertEquals(15, merged.get("q2").getFinishedTasksNum());
+ }
+
+ // ---- isSet flag: unset fields should not override previous values ----
+
+ @Test
+ public void testIsSetPreservesPreviousValues() {
+ // BE1 reports total=10, finished=3 with isSet properly set
+ mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 3)));
+
+ // BE2 reports same query but does NOT set total_tasks_num
+ // This simulates an older BE version without progress support.
+ TQueryStatistics stats2 = new TQueryStatistics();
+ // Intentionally NOT calling setTotalTasksNum/setFinishedTasksNum
+ // (isSet* returns false)
+ stats2.setScanRows(100); // set some other field to make it non-empty
+ mgr.updateBeQueryStats(buildParams(10002L, "q1", stats2));
+
+ Map<String, TQueryStatistics> merged = getMergedSnapshot();
+ TQueryStatistics result = merged.get("q1");
+
+ // BE2 didn't set total/finished, so original values from BE1 should
be preserved
+ Assert.assertEquals(10, result.getTotalTasksNum());
+ Assert.assertEquals(3, result.getFinishedTasksNum());
+ }
+
+ // ---- Zero-reporting BE should not interfere ----
+
+ @Test
+ public void testBeWithZeroProgress() {
+ mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 4)));
+
+ // BE2 reports zero progress correctly
+ mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(0, 0)));
+
+ Map<String, TQueryStatistics> merged = getMergedSnapshot();
+ TQueryStatistics result = merged.get("q1");
+
+ // total=10, finished=4 (from BE1); BE2's (0,0) is additive → still
(10,4)
+ Assert.assertEquals(10, result.getTotalTasksNum());
+ Assert.assertEquals(4, result.getFinishedTasksNum());
+ }
+
+ // ---- getQueryStatistics returns per-BE map ----
+
+ @Test
+ public void testGetQueryStatisticsPerBe() {
+ mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(5, 2)));
+ mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(3, 1)));
+
+ Map<Long, TQueryStatisticsResult> perBe = mgr.getQueryStatistics("q1");
+ Assert.assertEquals(2, perBe.size());
+ Assert.assertTrue(perBe.containsKey(10001L));
+ Assert.assertTrue(perBe.containsKey(10002L));
+ Assert.assertEquals(5,
perBe.get(10001L).getStatistics().getTotalTasksNum());
+ Assert.assertEquals(3,
perBe.get(10002L).getStatistics().getTotalTasksNum());
+ }
+
+ // ---- Non-existent query returns empty map ----
+
+ @Test
+ public void testGetQueryStatisticsNonExistent() {
+ Map<Long, TQueryStatisticsResult> perBe =
mgr.getQueryStatistics("non-existent-query");
+ Assert.assertTrue(perBe.isEmpty());
+ }
+
+ // ---- updateBeQueryStats with missing fields ----
+
+ @Test
+ public void testUpdateBeQueryStatsMissingBackendId() {
+ TReportWorkloadRuntimeStatusParams params = new
TReportWorkloadRuntimeStatusParams();
+ // backend_id not set, updateBeQueryStats should log a warning and
return early
+ mgr.updateBeQueryStats(params);
+ Assert.assertTrue(getMergedSnapshot().isEmpty());
+ }
+
+ // ---- updateBeQueryStats with missing query stats map ----
+
+ @Test
+ public void testUpdateBeQueryStatsMissingQueryStatsMap() {
+ TReportWorkloadRuntimeStatusParams params = new
TReportWorkloadRuntimeStatusParams();
+ params.setBackendId(10001L);
+ // query_statistics_result_map not set → should return early
+ mgr.updateBeQueryStats(params);
+ Assert.assertTrue(getMergedSnapshot().isEmpty());
+ }
+
+ // ---- isSet flag: verifying Thrift setter behavior inline ----
+
+ @Test
+ public void testThriftIsSetFlagRequired() {
+ // Confirm that using setter (via __set*) sets the __isset flag,
+ // whereas direct field assignment does not.
+ // This documents the historical bug that was fixed in this feature.
+
+ TQueryStatistics viaSetter = new TQueryStatistics();
+ viaSetter.setTotalTasksNum(5);
+ Assert.assertTrue("setTotalTasksNum via setter must set __isset flag",
+ viaSetter.isSetTotalTasksNum());
+
+ TQueryStatistics viaField = new TQueryStatistics();
+ viaField.total_tasks_num = 5; // direct field assignment
+ Assert.assertFalse("direct field assignment must NOT set __isset flag",
+ viaField.isSetTotalTasksNum());
+
+ // Same for finished_tasks_num
+ viaSetter.setFinishedTasksNum(3);
+ Assert.assertTrue(viaSetter.isSetFinishedTasksNum());
+
+ viaField.finished_tasks_num = 3;
+ Assert.assertFalse(viaField.isSetFinishedTasksNum());
+ }
+
+ // ---- Merge without any progress fields ----
+
+ @Test
+ public void testMergeWithoutProgressFields() {
+ // Regression test: when isSet is false for progress fields,
+ // merge should not touch them, leaving them at default (0).
+ TQueryStatistics stats = new TQueryStatistics();
+ // Intentionally leave total/finished unset
+ mgr.updateBeQueryStats(buildParams(10001L, "q1", stats));
+
+ Map<String, TQueryStatistics> merged = getMergedSnapshot();
+ TQueryStatistics result = merged.get("q1");
+
+ // Fields should still be 0 and isSet should be false
+ Assert.assertEquals(0, result.getTotalTasksNum());
+ Assert.assertEquals(0, result.getFinishedTasksNum());
+ }
+
+ // ---- Merge: three BEs combined ----
+
+ @Test
+ public void testThreeBeMergeProgress() {
+ mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(4, 1)));
+ mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(3, 3)));
+ mgr.updateBeQueryStats(buildParams(10003L, "q1", buildStats(5, 0)));
+
+ Map<String, TQueryStatistics> merged = getMergedSnapshot();
+ Assert.assertEquals(1, merged.size());
+
+ TQueryStatistics result = merged.get("q1");
+ // total = 4 + 3 + 5 = 12, finished = 1 + 3 + 0 = 4
+ Assert.assertEquals(12, result.getTotalTasksNum());
+ Assert.assertEquals(4, result.getFinishedTasksNum());
+ }
+
+ @Test
+ public void testSnapshotReadRequiresRebuild() {
+ mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(6, 2)));
+ // Newly reported data is not visible to sync readers before snapshot
rebuild.
+ Assert.assertTrue(mgr.getQueryStatisticsMap().isEmpty());
+
+ // Rebuild snapshot and verify the new data becomes visible.
+ Map<String, TQueryStatistics> merged = getMergedSnapshot();
+ Assert.assertEquals(1, merged.size());
+ Assert.assertEquals(6, merged.get("q1").getTotalTasksNum());
+ Assert.assertEquals(2, merged.get("q1").getFinishedTasksNum());
+ }
+
+ // ---- helper methods ----
+
+ private TQueryStatistics buildStats(int totalTasks, int finishedTasks) {
+ TQueryStatistics stats = new TQueryStatistics();
+ stats.setTotalTasksNum(totalTasks);
+ stats.setFinishedTasksNum(finishedTasks);
+ return stats;
+ }
+
+ private TReportWorkloadRuntimeStatusParams buildParams(long beId, String
queryId, TQueryStatistics stats) {
+ TQueryStatisticsResult result = new TQueryStatisticsResult();
+ result.setStatistics(stats);
+
+ TReportWorkloadRuntimeStatusParams params = new
TReportWorkloadRuntimeStatusParams();
+ params.setBackendId(beId);
+ Map<String, TQueryStatisticsResult> map = Maps.newHashMap();
+ map.put(queryId, result);
+ params.setQueryStatisticsResultMap(map);
+ return params;
+ }
+
+ // Refresh and read snapshot to match daemon-driven visibility semantics.
+ private Map<String, TQueryStatistics> getMergedSnapshot() {
+ mgr.rebuildQueryStatisticsSnapshot();
+ return mgr.getQueryStatisticsMap();
+ }
+}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 3b1c55fc295..52246428345 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -202,6 +202,9 @@ struct TQueryStatistics {
12: optional i64 spill_write_bytes_to_local_storage
13: optional i64 spill_read_bytes_from_local_storage
14: optional i64 bytes_write_into_cache
+ 15: optional i64 process_rows
+ 16: optional i32 finished_tasks_num
+ 17: optional i32 total_tasks_num
}
struct TQueryStatisticsResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]