This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 2a5147b1262 branch-4.1:[improvement](executor) unify current query 
runtime statistics and expose task progress (#60567) (#63130)
2a5147b1262 is described below

commit 2a5147b1262c60b79fb32facd4b78a92379c3577
Author: Wen Zhenghu <[email protected]>
AuthorDate: Mon May 11 20:58:23 2026 +0800

    branch-4.1:[improvement](executor) unify current query runtime statistics 
and expose task progress (#60567) (#63130)
    
    checkpick from https://github.com/apache/doris/pull/60567
    **PR Summary**
    - This PR unifies current-query runtime statistics onto the `BE -> FE`
    reporting pipeline, replacing the previous ad-hoc `RuntimeProfile`
    traversal path, and enriches `current_queries` with task-level progress
    plus broader resource metrics.
    - The goal is to make current-query visibility more real-time and
    consistent with audit statistics while simplifying and consolidating FE
    proc/REST surfaces.
    
    **What It Solves**
    - Unifies statistics source: `QeProcessorImpl` now reads aggregated
    `TQueryStatistics` from `WorkloadRuntimeStatusMgr` instead of relying on
    the legacy `CurrentQueryInfoProvider` path.
    - Improves progress observability: introduces `process_rows`,
    `total_tasks_num`, and `finished_tasks_num`, and exposes computed
    `Progress`.
    - Expands runtime metrics coverage: `current_queries` now includes
    richer scan/cpu/memory/shuffle/spill/cache counters.
    - Consolidates query views: `/current_queries` and
    `/current_query_stmts` now share the same statistics view; legacy
    per-query/per-fragment proc drill-down implementation is removed.
    
    **Implementation Details**
    - Protocol layer:
    - Extends `TQueryStatistics` with `process_rows`, `finished_tasks_num`,
    and `total_tasks_num`.
    - BE collection/reporting:
      - Accumulates `process_rows` in the execution path.
    - Records `total_tasks_num` at pipeline task graph initialization and
    increments `finished_tasks_num` in real time when tasks close.
    - Mirrors task-progress counters into `QueryTaskController` so counters
    remain available even after `QueryContext` teardown.
      - Exports new fields in `ResourceContext::to_thrift_query_statistics`.
    - FE aggregation/retention:
    - `WorkloadRuntimeStatusMgr` merges additional fields (including task
    progress) and refines timeout cleanup: remove query stats only when they
    are timed out and the query no longer exists in FE.
    - `QueryStatisticsItem` now carries `TQueryStatistics` as the unified
    data carrier for proc/REST.
    - Presentation layer:
    - `CurrentQueryStatisticsProcDir` adds expanded columns and computes
    `Progress`.
    - `/rest/v2/manager/query/current_queries` in `QueryProfileAction` now
    serves the same unified stats view.
    - Removes legacy classes: `CurrentQueryInfoProvider`,
    `CurrentQuerySqlProcDir`, `CurrentQueryFragmentProcNode`, and
    `CurrentQueryStatementsProcNode`.
    
    ```
    *************************** 1. row ***************************
                           QueryId: e00b00b1155d4042-98862b60016a768a
                      ConnectionId: 394
                           Catalog: internal
                          Database: wzhtest
                              User: root
                          ExecTime: 20717
                           SqlHash: cf263b08302d8be436c97dd5e6f0d283
                         Statement: INSERT INTO test_query_progress_tb   SELECT 
DISTINCT k, CONCAT(v, CAST(k AS STRING))   FROM test_query_progress_tb   WHERE 
k % 2 = 0
                          ScanRows: 45400000 Rows
                         ScanBytes: 2.70 GB
                       ProcessRows: 75598123 Rows
                             CpuMs: 178336
                MaxPeakMemoryBytes: 13.03 GB
            CurrentUsedMemoryBytes: 8.69 GB
                   WorkloadGroupId: 1777125330381
                  ShuffleSendBytes: 0.00
                   ShuffleSendRows: 0 Rows
         ScanBytesFromLocalStorage: 31.48 MB
        ScanBytesFromRemoteStorage: 0.00
     SpillWriteBytesToLocalStorage: 0.00
    SpillReadBytesFromLocalStorage: 0.00
               BytesWriteIntoCache: 0.00
                        TotalTasks: 74
                     FinishedTasks: 51
                          Progress: 68%
    ------------------------
    -- first--
    QueryId: e2b8c99658a94743-9ebbf0d036d83295
      ConnectionId: 9
      Catalog: hive_test
      Database: tpch100_parquet
      User: root
      ExecTime: 6093
      SqlHash: f8a30e4182d72cce3eff6cb385005b1f
      Statement: select ... from supplier, lineitem l1, orders, nation ... 
limit 100
      ScanRows: 621466194 Rows
      ScanBytes: 5.37 GB
      ProcessRows: 79079742 Rows
      CpuMs: 31655
      MaxPeakMemoryBytes: 2.32 GB
      CurrentUsedMemoryBytes: 2.18 GB
      WorkloadGroupId: 1777253545394
      ShuffleSendBytes: 0.00
      ShuffleSendRows: 0 Rows
      ScanBytesFromLocalStorage: 0.00
      ScanBytesFromRemoteStorage: 5.37 GB
      SpillWriteBytesToLocalStorage: 0.00
      SpillReadBytesFromLocalStorage: 0.00
      BytesWriteIntoCache: 0.00
      TotalTasks: 138
      FinishedTasks: 49
      Progress: 35%
    --second--
      QueryId: e2b8c99658a94743-9ebbf0d036d83295
      ConnectionId: 9
      Catalog: hive_test
      Database: tpch100_parquet
      User: root
      ExecTime: 10807
      SqlHash: f8a30e4182d72cce3eff6cb385005b1f
      Statement: select ... from supplier, lineitem l1, orders, nation ... 
limit 100
      ScanRows: 1102562592 Rows
      ScanBytes: 9.20 GB
      ProcessRows: 112176670 Rows
      CpuMs: 53808
      MaxPeakMemoryBytes: 3.13 GB
      CurrentUsedMemoryBytes: 2.50 GB
      WorkloadGroupId: 1777253545394
      ShuffleSendBytes: 0.00
      ShuffleSendRows: 0 Rows
      ScanBytesFromLocalStorage: 0.00
      ScanBytesFromRemoteStorage: 9.20 GB
      SpillWriteBytesToLocalStorage: 0.00
      SpillReadBytesFromLocalStorage: 0.00
      BytesWriteIntoCache: 0.00
      TotalTasks: 138
      FinishedTasks: 65
      Progress: 47%
    ```
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    Co-authored-by: yiguolei <[email protected]>
    Co-authored-by: xuchenhao <[email protected]>
    Co-authored-by: xuchenhao <[email protected]>
---
 be/src/exec/operator/operator.cpp                  |   1 +
 be/src/exec/pipeline/pipeline_fragment_context.cpp |   4 +
 be/src/exec/pipeline/pipeline_fragment_context.h   |   5 +-
 be/src/runtime/query_context.cpp                   |  12 +
 be/src/runtime/query_context.h                     |   6 +
 be/src/runtime/workload_management/io_context.h    |   4 +
 .../workload_management/query_task_controller.cpp  |  18 ++
 .../workload_management/query_task_controller.h    |  10 +
 .../workload_management/resource_context.cpp       |   8 +
 be/test/exec/pipeline/pipeline_test.cpp            | 125 +++++++++
 .../common/proc/CurrentQueryFragmentProcNode.java  |  89 -------
 .../common/proc/CurrentQueryInfoProvider.java      | 200 ---------------
 .../doris/common/proc/CurrentQuerySqlProcDir.java  |  70 -----
 .../proc/CurrentQueryStatementsProcNode.java       |  71 -----
 .../common/proc/CurrentQueryStatisticsProcDir.java |  80 +++---
 .../org/apache/doris/common/proc/ProcService.java  |   2 +-
 .../doris/common/profile/RuntimeProfile.java       |   5 -
 .../httpv2/rest/manager/QueryProfileAction.java    |  10 +-
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |   5 +
 .../org/apache/doris/qe/QueryStatisticsItem.java   |  18 ++
 .../WorkloadRuntimeStatusMgr.java                  | 112 ++++++--
 .../proc/CurrentQueryStatisticsProcDirTest.java    |  89 +++++++
 .../WorkloadRuntimeStatusMgrTest.java              | 285 +++++++++++++++++++++
 gensrc/thrift/FrontendService.thrift               |   3 +
 24 files changed, 739 insertions(+), 493 deletions(-)

diff --git a/be/src/exec/operator/operator.cpp 
b/be/src/exec/operator/operator.cpp
index 3b330550faf..dde91a51039 100644
--- a/be/src/exec/operator/operator.cpp
+++ b/be/src/exec/operator/operator.cpp
@@ -420,6 +420,7 @@ void PipelineXLocalStateBase::reached_limit(Block* block, 
bool* eos) {
 
     if (auto rows = block->rows()) {
         _num_rows_returned += rows;
+        
_state->get_query_ctx()->resource_ctx()->io_context()->update_process_rows(rows);
     }
 }
 
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index e04745a853e..c8f83ad0781 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -602,6 +602,8 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
     }
     _pipeline_parent_map.clear();
     _op_id_to_shared_state.clear();
+    // Record task cardinality once when this fragment context finishes task 
initialization.
+    
_query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
 
     return Status::OK();
 }
@@ -1934,6 +1936,8 @@ void 
PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
     {
         std::lock_guard<std::mutex> l(_task_mutex);
         ++_closed_tasks;
+        // Update query-level finished task progress in real time.
+        _query_ctx->inc_finished_task_num();
         if (_closed_tasks >= _total_tasks) {
             need_remove = _close_fragment_instance();
         }
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h 
b/be/src/exec/pipeline/pipeline_fragment_context.h
index 01306799aa4..c220ea386f6 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.h
+++ b/be/src/exec/pipeline/pipeline_fragment_context.h
@@ -39,6 +39,7 @@
 #include "runtime/runtime_state.h"
 #include "runtime/task_execution_context.h"
 #include "util/stopwatch.hpp"
+#include "util/uid_util.h"
 
 namespace doris {
 struct ReportStatusRequest;
@@ -88,11 +89,11 @@ public:
 
     [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
 
+    void decrement_running_task(PipelineId pipeline_id);
+
     uint32_t rec_cte_stage() const { return _rec_cte_stage; }
     void set_rec_cte_stage(uint32_t stage) { _rec_cte_stage = stage; }
 
-    void decrement_running_task(PipelineId pipeline_id);
-
     Status send_report(bool);
 
     void trigger_report_if_necessary();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index e00a37aadb1..6fb8e41d824 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -555,4 +555,16 @@ Status QueryContext::reset_global_rf(const 
google::protobuf::RepeatedField<int32
     return Status::OK();
 }
 
+void QueryContext::add_total_task_num(int delta) {
+    if (auto* qtc = 
dynamic_cast<QueryTaskController*>(_resource_ctx->task_controller())) {
+        qtc->add_total_task_num(delta);
+    }
+}
+
+void QueryContext::inc_finished_task_num() {
+    if (auto* qtc = 
dynamic_cast<QueryTaskController*>(_resource_ctx->task_controller())) {
+        qtc->inc_finished_task_num();
+    }
+}
+
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d5e559890c0..5e2b6babe87 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -47,6 +47,7 @@ namespace doris {
 
 class PipelineFragmentContext;
 class PipelineTask;
+class QueryTaskController;
 class Dependency;
 class RecCTEScanLocalState;
 
@@ -203,6 +204,10 @@ public:
 
     TUniqueId query_id() const { return _query_id; }
 
+    // Expose task-level query progress counters for runtime statistics 
reporting.
+    void add_total_task_num(int delta);
+    void inc_finished_task_num();
+
     ScannerScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
 
     ScannerScheduler* get_remote_scan_scheduler() { return 
_remote_scan_task_scheduler; }
@@ -311,6 +316,7 @@ public:
     Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& 
filter_ids);
 
 private:
+    // Task-level progress counters for current query.
     friend class QueryTaskController;
 
     int _timeout_second;
diff --git a/be/src/runtime/workload_management/io_context.h 
b/be/src/runtime/workload_management/io_context.h
index 3c0dd4343fd..1032c148a63 100644
--- a/be/src/runtime/workload_management/io_context.h
+++ b/be/src/runtime/workload_management/io_context.h
@@ -45,6 +45,7 @@ public:
         // number rows returned by query.
         // only set once by result sink when closing.
         RuntimeProfile::Counter* returned_rows_counter_;
+        RuntimeProfile::Counter* process_rows_counter_;
         RuntimeProfile::Counter* shuffle_send_bytes_counter_;
         RuntimeProfile::Counter* shuffle_send_rows_counter_;
 
@@ -63,6 +64,7 @@ public:
             bytes_write_into_cache_counter_ =
                     ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
             returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", 
TUnit::UNIT);
+            process_rows_counter_ = ADD_COUNTER(profile_, "ProcessRows", 
TUnit::UNIT);
             shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, 
"ShuffleSendBytes", TUnit::BYTES);
             shuffle_send_rows_counter_ =
                     ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", 
TUnit::UNIT);
@@ -94,6 +96,7 @@ public:
         return stats_.bytes_write_into_cache_counter_->value();
     }
     int64_t returned_rows() const { return 
stats_.returned_rows_counter_->value(); }
+    int64_t process_rows() const { return 
stats_.process_rows_counter_->value(); }
     int64_t shuffle_send_bytes() const { return 
stats_.shuffle_send_bytes_counter_->value(); }
     int64_t shuffle_send_rows() const { return 
stats_.shuffle_send_rows_counter_->value(); }
 
@@ -117,6 +120,7 @@ public:
         stats_.bytes_write_into_cache_counter_->update(delta);
     }
     void update_returned_rows(int64_t delta) const { 
stats_.returned_rows_counter_->update(delta); }
+    void update_process_rows(int64_t delta) const { 
stats_.process_rows_counter_->update(delta); }
     void update_shuffle_send_bytes(int64_t delta) const {
         stats_.shuffle_send_bytes_counter_->update(delta);
     }
diff --git a/be/src/runtime/workload_management/query_task_controller.cpp 
b/be/src/runtime/workload_management/query_task_controller.cpp
index 47d6c7c05cd..16e950fa56c 100644
--- a/be/src/runtime/workload_management/query_task_controller.cpp
+++ b/be/src/runtime/workload_management/query_task_controller.cpp
@@ -227,5 +227,23 @@ std::vector<PipelineTask*> 
QueryTaskController::get_revocable_tasks() {
     return tasks;
 }
 
+void QueryTaskController::add_total_task_num(int delta) {
+    _total_task_num.fetch_add(delta, std::memory_order_relaxed);
+}
+
+void QueryTaskController::inc_finished_task_num() {
+    _finished_task_num.fetch_add(1, std::memory_order_relaxed);
+}
+
+int QueryTaskController::get_total_task_num() const {
+    // Read from controller-owned counters to avoid lifecycle dependency on 
QueryContext.
+    return _total_task_num.load(std::memory_order_relaxed);
+}
+
+int QueryTaskController::get_finished_task_num() const {
+    // Read from controller-owned counters to avoid lifecycle dependency on 
QueryContext.
+    return _finished_task_num.load(std::memory_order_relaxed);
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/runtime/workload_management/query_task_controller.h 
b/be/src/runtime/workload_management/query_task_controller.h
index f10a846c538..0d46196c150 100644
--- a/be/src/runtime/workload_management/query_task_controller.h
+++ b/be/src/runtime/workload_management/query_task_controller.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <atomic>
+
 #include "common/factory_creator.h"
 #include "runtime/workload_management/task_controller.h"
 
@@ -46,11 +48,19 @@ public:
     size_t get_revocable_size() override;
     Status revoke_memory() override;
     std::vector<PipelineTask*> get_revocable_tasks() override;
+    // Expose task progress counters without leaking full QueryContext.
+    void add_total_task_num(int delta);
+    void inc_finished_task_num();
+    int get_total_task_num() const;
+    int get_finished_task_num() const;
 
 protected:
     QueryTaskController(const std::shared_ptr<QueryContext>& query_ctx) : 
query_ctx_(query_ctx) {}
 
     const std::weak_ptr<QueryContext> query_ctx_;
+    // Keep task progress counters in controller so they outlive QueryContext 
if needed.
+    std::atomic<int> _total_task_num {0};
+    std::atomic<int> _finished_task_num {0};
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/runtime/workload_management/resource_context.cpp 
b/be/src/runtime/workload_management/resource_context.cpp
index c1b0fd2b744..d7a729aa69c 100644
--- a/be/src/runtime/workload_management/resource_context.cpp
+++ b/be/src/runtime/workload_management/resource_context.cpp
@@ -20,6 +20,7 @@
 #include <gen_cpp/data.pb.h>
 #include <glog/logging.h>
 
+#include "runtime/workload_management/query_task_controller.h"
 #include "util/time.h"
 
 namespace doris {
@@ -31,6 +32,7 @@ void 
ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
     statistics->__set_scan_bytes(io_context()->scan_bytes());
     statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
     statistics->__set_returned_rows(io_context()->returned_rows());
+    statistics->__set_process_rows(io_context()->process_rows());
     
statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
     
statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes());
     statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes());
@@ -50,6 +52,12 @@ void 
ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
             io_context_->spill_write_bytes_to_local_storage());
     statistics->__set_spill_read_bytes_from_local_storage(
             io_context_->spill_read_bytes_from_local_storage());
+
+    if (auto* query_task_controller = 
dynamic_cast<QueryTaskController*>(task_controller())) {
+        // Fill query task-level progress directly from task controller.
+        
statistics->__set_total_tasks_num(query_task_controller->get_total_task_num());
+        
statistics->__set_finished_tasks_num(query_task_controller->get_finished_task_num());
+    }
 }
 
 #include "common/compile_check_end.h"
diff --git a/be/test/exec/pipeline/pipeline_test.cpp 
b/be/test/exec/pipeline/pipeline_test.cpp
index 4150e281a35..d5d0b5028d6 100644
--- a/be/test/exec/pipeline/pipeline_test.cpp
+++ b/be/test/exec/pipeline/pipeline_test.cpp
@@ -17,6 +17,7 @@
 
 #include "exec/pipeline/pipeline.h"
 
+#include <gen_cpp/FrontendService_types.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -40,6 +41,8 @@
 #include "runtime/descriptor_helper.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
+#include "runtime/workload_management/query_task_controller.h"
+#include "runtime/workload_management/resource_context.h"
 
 namespace doris {
 
@@ -467,6 +470,36 @@ TEST_F(PipelineTest, HAPPY_PATH) {
     downstream_recvr->close();
 }
 
+TEST_F(PipelineTest, QueryTaskProgressCounters) {
+    // Verify task-level counters are updated via QueryContext and exposed by 
QueryTaskController.
+    _query_ctx->add_total_task_num(7);
+    _query_ctx->inc_finished_task_num();
+    _query_ctx->inc_finished_task_num();
+    _query_ctx->inc_finished_task_num();
+
+    auto* query_task_controller =
+            
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+    ASSERT_NE(query_task_controller, nullptr);
+    EXPECT_EQ(query_task_controller->get_total_task_num(), 7);
+    EXPECT_EQ(query_task_controller->get_finished_task_num(), 3);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressCountersOutliveQueryContext) {
+    // Verify controller-owned counters still work after QueryContext is 
destroyed.
+    auto resource_ctx = _query_ctx->resource_ctx();
+    auto* query_task_controller =
+            
dynamic_cast<QueryTaskController*>(resource_ctx->task_controller());
+    ASSERT_NE(query_task_controller, nullptr);
+
+    _query_ctx->add_total_task_num(5);
+    _query_ctx->inc_finished_task_num();
+    _query_ctx->inc_finished_task_num();
+
+    _query_ctx.reset();
+    EXPECT_EQ(query_task_controller->get_total_task_num(), 5);
+    EXPECT_EQ(query_task_controller->get_finished_task_num(), 2);
+}
+
 TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
     _reset();
     // Pipeline(ExchangeOperator(id=0, HASH_PARTITIONED) -> 
ExchangeSinkOperatorX(id=1, UNPARTITIONED))
@@ -1163,4 +1196,96 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
     downstream_recvr->close();
 }
 
+TEST_F(PipelineTest, QueryTaskProgressConcurrentUpdates) {
+    // Verify counters are thread-safe under concurrent updates from multiple 
threads.
+    auto resource_ctx = _query_ctx->resource_ctx();
+    auto* ctrl = 
dynamic_cast<QueryTaskController*>(resource_ctx->task_controller());
+    ASSERT_NE(ctrl, nullptr);
+
+    _query_ctx->add_total_task_num(400);
+
+    std::vector<std::thread> threads;
+    for (int i = 0; i < 8; i++) {
+        threads.emplace_back([this]() {
+            for (int j = 0; j < 50; j++) {
+                _query_ctx->inc_finished_task_num();
+            }
+        });
+    }
+    for (auto& t : threads) {
+        t.join();
+    }
+
+    EXPECT_EQ(ctrl->get_total_task_num(), 400);
+    EXPECT_EQ(ctrl->get_finished_task_num(), 400);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressThriftSerialization) {
+    // Verify progress counters are correctly serialized to Thrift struct.
+    _query_ctx->add_total_task_num(10);
+    _query_ctx->inc_finished_task_num();
+    _query_ctx->inc_finished_task_num();
+    _query_ctx->inc_finished_task_num();
+    _query_ctx->inc_finished_task_num();
+
+    TQueryStatistics tqs;
+    _query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs);
+
+    EXPECT_TRUE(tqs.__isset.total_tasks_num);
+    EXPECT_EQ(tqs.total_tasks_num, 10);
+    EXPECT_TRUE(tqs.__isset.finished_tasks_num);
+    EXPECT_EQ(tqs.finished_tasks_num, 4);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressBoundaryZeroTotal) {
+    // Verify behavior when no tasks have been registered (total = 0).
+    auto* ctrl = 
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+    ASSERT_NE(ctrl, nullptr);
+
+    // total = 0, finished = 0
+    EXPECT_EQ(ctrl->get_total_task_num(), 0);
+    EXPECT_EQ(ctrl->get_finished_task_num(), 0);
+
+    // inc_finished with no total should still work without crash
+    _query_ctx->inc_finished_task_num();
+    EXPECT_EQ(ctrl->get_finished_task_num(), 1);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressAllFinished) {
+    // Verify 100% progress when all tasks finish.
+    _query_ctx->add_total_task_num(8);
+    for (int i = 0; i < 8; i++) {
+        _query_ctx->inc_finished_task_num();
+    }
+
+    auto* ctrl = 
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+    ASSERT_NE(ctrl, nullptr);
+    EXPECT_EQ(ctrl->get_total_task_num(), 8);
+    EXPECT_EQ(ctrl->get_finished_task_num(), 8);
+
+    // Verify thrift serialization
+    TQueryStatistics tqs;
+    _query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs);
+    EXPECT_EQ(tqs.total_tasks_num, 8);
+    EXPECT_EQ(tqs.finished_tasks_num, 8);
+}
+
+TEST_F(PipelineTest, QueryTaskProgressCountersSurviveReset) {
+    // Verify that after calling _reset(), fresh counters are initialized to 
zero.
+    auto* ctrl1 = 
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+    ASSERT_NE(ctrl1, nullptr);
+    _query_ctx->add_total_task_num(10);
+    _query_ctx->inc_finished_task_num();
+    _query_ctx->inc_finished_task_num();
+    EXPECT_EQ(ctrl1->get_total_task_num(), 10);
+    EXPECT_EQ(ctrl1->get_finished_task_num(), 2);
+
+    // Reset creates a new QueryContext
+    _reset();
+    auto* ctrl2 = 
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
+    ASSERT_NE(ctrl2, nullptr);
+    EXPECT_EQ(ctrl2->get_total_task_num(), 0);
+    EXPECT_EQ(ctrl2->get_finished_task_num(), 0);
+}
+
 } // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
deleted file mode 100644
index 27b7e673d38..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.util.QueryStatisticsFormatter;
-import org.apache.doris.qe.QueryStatisticsItem;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-
-/*
- * show proc "/current_queries/{query_id}/fragments"
- * set variable "set is_report_success = true" to enable "ScanBytes" and 
"ProcessRows".
- */
-public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
-    private static final Logger LOG = 
LogManager.getLogger(CurrentQueryFragmentProcNode.class);
-    public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            .add("FragmentId").add("InstanceId").add("Host")
-            .add("ScanBytes").add("ProcessRows").build();
-    private QueryStatisticsItem item;
-
-    public CurrentQueryFragmentProcNode(QueryStatisticsItem item) {
-        this.item = item;
-    }
-
-    @Override
-    public ProcResult fetchResult() throws AnalysisException {
-        return requestFragmentExecInfos();
-    }
-
-    private ProcResult requestFragmentExecInfos() throws AnalysisException {
-        final CurrentQueryInfoProvider provider = new 
CurrentQueryInfoProvider();
-        final Collection<CurrentQueryInfoProvider.InstanceStatistics> 
instanceStatisticsCollection
-                = provider.getInstanceStatistics(item);
-        final List<List<String>> sortedRowData = Lists.newArrayList();
-        for (CurrentQueryInfoProvider.InstanceStatistics instanceStatistics :
-                instanceStatisticsCollection) {
-            final List<String> rowData = Lists.newArrayList();
-            rowData.add(instanceStatistics.getFragmentId());
-            rowData.add(instanceStatistics.getInstanceId().toString());
-            rowData.add(instanceStatistics.getAddress().toString());
-            if (item.getIsReportSucc()) {
-                rowData.add(QueryStatisticsFormatter.getScanBytes(
-                        instanceStatistics.getScanBytes()));
-                rowData.add(QueryStatisticsFormatter.getRowsReturned(
-                        instanceStatistics.getRowsReturned()));
-            } else {
-                rowData.add("N/A");
-                rowData.add("N/A");
-            }
-            sortedRowData.add(rowData);
-        }
-
-        // sort according to explain's fragment index
-        sortedRowData.sort(new Comparator<List<String>>() {
-            @Override
-            public int compare(List<String> l1, List<String> l2) {
-                return l1.get(0).compareTo(l2.get(0));
-            }
-        });
-        final BaseProcResult result = new BaseProcResult();
-        result.setNames(TITLE_NAMES.asList());
-        result.setRows(sortedRowData);
-        return result;
-    }
-
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
deleted file mode 100644
index de7247ab3ab..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
+++ /dev/null
@@ -1,200 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.profile.Counter;
-import org.apache.doris.common.profile.RuntimeProfile;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.qe.QueryStatisticsItem;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TUniqueId;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Provide running query's statistics.
- */
-public class CurrentQueryInfoProvider {
-    private static final Logger LOG = 
LogManager.getLogger(CurrentQueryInfoProvider.class);
-
-    public CurrentQueryInfoProvider() {
-    }
-
-    /**
-     * get Counters from Coordinator's RuntimeProfile and return query's 
statistics.
-     *
-     * @param item
-     * @return
-     * @throws AnalysisException
-     */
-    public QueryStatistics getQueryStatistics(QueryStatisticsItem item) throws 
AnalysisException {
-        return new QueryStatistics(item.getQueryProfile());
-    }
-
-    /**
-     *
-     * @param items
-     * @return
-     * @throws AnalysisException
-     */
-    public Map<String, QueryStatistics> 
getQueryStatistics(Collection<QueryStatisticsItem> items) {
-        final Map<String, QueryStatistics> queryStatisticsMap = 
Maps.newHashMap();
-        for (QueryStatisticsItem item : items) {
-            queryStatisticsMap.put(item.getQueryId(), new 
QueryStatistics(item.getQueryProfile()));
-        }
-        return queryStatisticsMap;
-    }
-
-    /**
-     * Return query's instances statistics.
-     *
-     * @param item
-     * @return
-     * @throws AnalysisException
-     */
-    public Collection<InstanceStatistics> 
getInstanceStatistics(QueryStatisticsItem item) throws AnalysisException {
-        final Map<String, RuntimeProfile> instanceProfiles = 
collectInstanceProfile(item.getQueryProfile());
-        final List<InstanceStatistics> instanceStatisticsList = 
Lists.newArrayList();
-        for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : 
item.getFragmentInstanceInfos()) {
-            final RuntimeProfile instanceProfile
-                    = 
instanceProfiles.get(DebugUtil.printId(instanceInfo.getInstanceId()));
-            Preconditions.checkNotNull(instanceProfile);
-            final InstanceStatistics Statistics =
-                    new InstanceStatistics(
-                            instanceInfo.getFragmentId(),
-                            instanceInfo.getInstanceId(),
-                            instanceInfo.getAddress(),
-                            instanceProfile);
-            instanceStatisticsList.add(Statistics);
-        }
-        return instanceStatisticsList;
-    }
-
-    /**
-     * Profile trees is query profile -> fragment profile -> instance profile 
....
-     * @param queryProfile
-     * @return instanceProfiles
-     */
-    private Map<String, RuntimeProfile> collectInstanceProfile(RuntimeProfile 
queryProfile) {
-        final Map<String, RuntimeProfile> instanceProfiles = Maps.newHashMap();
-        for (RuntimeProfile fragmentProfile : 
queryProfile.getChildMap().values()) {
-            for (Map.Entry<String, RuntimeProfile> entry : 
fragmentProfile.getChildMap().entrySet()) {
-                Preconditions.checkState(instanceProfiles.put(
-                        parseInstanceId(entry.getKey()), entry.getValue()) == 
null);
-            }
-        }
-        return instanceProfiles;
-    }
-
-    /**
-     * Instance profile key is "Instance ${instance_id} (host=$host $port)"
-     * @param str
-     * @return
-     */
-    private String parseInstanceId(String str) {
-        final String[] elements = str.split(" ");
-        if (elements.length == 4) {
-            return elements[1];
-        } else {
-            Preconditions.checkState(false);
-            return "";
-        }
-    }
-
-    public static class QueryStatistics {
-        final List<Map<String, Counter>> counterMaps;
-
-        public QueryStatistics(RuntimeProfile profile) {
-            counterMaps = Lists.newArrayList();
-            collectCounters(profile, counterMaps);
-        }
-
-        private void collectCounters(RuntimeProfile profile,
-                                                List<Map<String, Counter>> 
counterMaps) {
-            for (Map.Entry<String, RuntimeProfile> entry : 
profile.getChildMap().entrySet()) {
-                counterMaps.add(entry.getValue().getCounterMap());
-                collectCounters(entry.getValue(), counterMaps);
-            }
-        }
-
-        public long getScanBytes() {
-            long scanBytes = 0;
-            for (Map<String, Counter> counters : counterMaps) {
-                final Counter counter = counters.get("CompressedBytesRead");
-                scanBytes += counter == null ? 0 : counter.getValue();
-            }
-            return scanBytes;
-        }
-
-        public long getRowsReturned() {
-            long rowsReturned = 0;
-            for (Map<String, Counter> counters : counterMaps) {
-                final Counter counter = counters.get("RowsReturned");
-                rowsReturned += counter == null ? 0 : counter.getValue();
-            }
-            return rowsReturned;
-        }
-    }
-
-    public static class InstanceStatistics {
-        private final String fragmentId;
-        private final TUniqueId instanceId;
-        private final TNetworkAddress address;
-        private final QueryStatistics statistics;
-
-        public InstanceStatistics(
-                String fragmentId,
-                TUniqueId instanceId,
-                TNetworkAddress address,
-                RuntimeProfile profile) {
-            this.fragmentId = fragmentId;
-            this.instanceId = instanceId;
-            this.address = address;
-            this.statistics = new QueryStatistics(profile);
-        }
-
-        public String getFragmentId() {
-            return fragmentId;
-        }
-
-        public TUniqueId getInstanceId() {
-            return instanceId;
-        }
-
-        public TNetworkAddress getAddress() {
-            return address;
-        }
-
-        public long getRowsReturned() {
-            return statistics.getRowsReturned();
-        }
-
-        public long getScanBytes() {
-            return statistics.getScanBytes();
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java
deleted file mode 100644
index 2d21b656ef3..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.qe.QueryStatisticsItem;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-/*
- * show proc "/current_queries/{query_id}"
- */
-public class CurrentQuerySqlProcDir implements ProcDirInterface {
-
-    public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            .add("Sql").build();
-
-    private final QueryStatisticsItem item;
-
-    public CurrentQuerySqlProcDir(QueryStatisticsItem item) {
-        this.item = item;
-    }
-
-    @Override
-    public boolean register(String name, ProcNodeInterface node) {
-        return false;
-    }
-
-    @Override
-    public ProcNodeInterface lookup(String name) throws AnalysisException {
-        if (Strings.isNullOrEmpty(name)) {
-            return  null;
-        }
-
-        if (!name.equals("fragments")) {
-            throw new AnalysisException(name + " doesn't exist.");
-        }
-
-        return new CurrentQueryFragmentProcNode(item);
-    }
-
-    @Override
-    public ProcResult fetchResult() throws AnalysisException {
-        final BaseProcResult result = new BaseProcResult();
-        result.setNames(TITLE_NAMES.asList());
-        final List<String> values = Lists.newArrayList();
-        values.add(item.getSql());
-        result.addRow(values);
-        return result;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java
deleted file mode 100644
index 746726c9051..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.qe.QeProcessorImpl;
-import org.apache.doris.qe.QueryStatisticsItem;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.commons.codec.digest.DigestUtils;
-
-import java.util.List;
-import java.util.Map;
-
-/*
- * show proc "/current_query_stmts"
- */
-public class CurrentQueryStatementsProcNode implements ProcNodeInterface {
-    public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            
.add("QueryId").add("ConnectionId").add("Catalog").add("Database").add("User")
-            .add("ExecTime").add("SqlHash").add("Statement").build();
-
-    private static final int EXEC_TIME_INDEX = 5;
-
-    @Override
-    public ProcResult fetchResult() throws AnalysisException {
-        final BaseProcResult result = new BaseProcResult();
-        final Map<String, QueryStatisticsItem> statistic =
-                QeProcessorImpl.INSTANCE.getQueryStatistics();
-        result.setNames(TITLE_NAMES.asList());
-        final List<List<String>> sortedRowData = Lists.newArrayList();
-
-        for (QueryStatisticsItem item : statistic.values()) {
-            final List<String> values = Lists.newArrayList();
-            values.add(item.getQueryId());
-            values.add(item.getConnId());
-            values.add(item.getCatalog());
-            values.add(item.getDb());
-            values.add(item.getUser());
-            values.add(item.getQueryExecTime());
-            values.add(DigestUtils.md5Hex(item.getSql()));
-            values.add(item.getSql());
-            sortedRowData.add(values);
-        }
-
-        // sort according to ExecTime
-        sortedRowData.sort((l1, l2) -> {
-            final long execTime1 = Long.parseLong(l1.get(EXEC_TIME_INDEX));
-            final long execTime2 = Long.parseLong(l2.get(EXEC_TIME_INDEX));
-            return Long.compare(execTime2, execTime1);
-        });
-        result.setRows(sortedRowData);
-        return result;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
index 0df36c0040f..88a68964f82 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
@@ -21,24 +21,33 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.QueryStatisticsFormatter;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QueryStatisticsItem;
+import org.apache.doris.thrift.TQueryStatistics;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.apache.commons.codec.digest.DigestUtils;
 
 import java.util.List;
 import java.util.Map;
 
 /*
  * show proc "/current_queries"
- * only set variable "set is_report_success = true" to enable "ScanBytes" and 
"ProcessRows".
+ * the statistics is same as the data in audit log.
  */
 public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
+    // ProcessRows temp used for doris manager compatibility, will be 
implemented future.
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
             
.add("QueryId").add("ConnectionId").add("Catalog").add("Database").add("User")
-            .add("ScanBytes").add("ProcessRows").add("ExecTime").build();
+            .add("ExecTime").add("SqlHash").add("Statement")
+            .add("ScanRows").add("ScanBytes").add("ProcessRows").add("CpuMs")
+            
.add("MaxPeakMemoryBytes").add("CurrentUsedMemoryBytes").add("WorkloadGroupId")
+            .add("ShuffleSendBytes").add("ShuffleSendRows")
+            .add("ScanBytesFromLocalStorage").add("ScanBytesFromRemoteStorage")
+            
.add("SpillWriteBytesToLocalStorage").add("SpillReadBytesFromLocalStorage")
+            .add("BytesWriteIntoCache")
+            .add("TotalTasks").add("FinishedTasks").add("Progress").build();
 
-    private static final int EXEC_TIME_INDEX = 7;
+    private static final int EXEC_TIME_INDEX = 5;
 
     @Override
     public boolean register(String name, ProcNodeInterface node) {
@@ -47,15 +56,7 @@ public class CurrentQueryStatisticsProcDir implements 
ProcDirInterface {
 
     @Override
     public ProcNodeInterface lookup(String name) throws AnalysisException {
-        if (Strings.isNullOrEmpty(name)) {
-            return null;
-        }
-        final Map<String, QueryStatisticsItem> statistic = 
QeProcessorImpl.INSTANCE.getQueryStatistics();
-        final QueryStatisticsItem item = statistic.get(name);
-        if (item == null) {
-            throw new AnalysisException(name + " doesn't exist.");
-        }
-        return new CurrentQuerySqlProcDir(item);
+        throw new AnalysisException("operation doesn't support.");
     }
 
     @Override
@@ -65,32 +66,41 @@ public class CurrentQueryStatisticsProcDir implements 
ProcDirInterface {
                 QeProcessorImpl.INSTANCE.getQueryStatistics();
         result.setNames(TITLE_NAMES.asList());
         final List<List<String>> sortedRowData = Lists.newArrayList();
-
-        final CurrentQueryInfoProvider provider = new 
CurrentQueryInfoProvider();
-        final Map<String, CurrentQueryInfoProvider.QueryStatistics> 
statisticsMap
-                = provider.getQueryStatistics(statistic.values());
         for (QueryStatisticsItem item : statistic.values()) {
             final List<String> values = Lists.newArrayList();
+            final TQueryStatistics queryStatistics = item.getQueryStatistics();
             values.add(item.getQueryId());
             values.add(item.getConnId());
             values.add(item.getCatalog());
             values.add(item.getDb());
             values.add(item.getUser());
-            if (item.getIsReportSucc()) {
-                final CurrentQueryInfoProvider.QueryStatistics statistics
-                        = statisticsMap.get(item.getQueryId());
-                values.add(QueryStatisticsFormatter.getScanBytes(
-                        statistics.getScanBytes()));
-                values.add(QueryStatisticsFormatter.getRowsReturned(
-                        statistics.getRowsReturned()));
-            } else {
-                values.add("N/A");
-                values.add("N/A");
-            }
             values.add(item.getQueryExecTime());
+            values.add(DigestUtils.md5Hex(item.getSql()));
+            values.add(item.getSql());
+            
values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getScanRows()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytes()));
+            
values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getProcessRows()));
+            values.add(String.valueOf(queryStatistics.getCpuMs()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getMaxPeakMemoryBytes()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getCurrentUsedMemoryBytes()));
+            values.add(String.valueOf(queryStatistics.getWorkloadGroupId()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getShuffleSendBytes()));
+            
values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getShuffleSendRows()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytesFromLocalStorage()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytesFromRemoteStorage()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getSpillWriteBytesToLocalStorage()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getSpillReadBytesFromLocalStorage()));
+            
values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getBytesWriteIntoCache()));
+
+            long total = queryStatistics.isSetTotalTasksNum() ? 
queryStatistics.getTotalTasksNum() : 0;
+            long finished = queryStatistics.isSetFinishedTasksNum() ? 
queryStatistics.getFinishedTasksNum() : 0;
+            values.add(String.valueOf(total));
+            values.add(String.valueOf(finished));
+            values.add(formatProgress(total, finished));
+
             sortedRowData.add(values);
         }
-        // sort according to ExecTime
+
         sortedRowData.sort((l1, l2) -> {
             final long execTime1 = Long.parseLong(l1.get(EXEC_TIME_INDEX));
             final long execTime2 = Long.parseLong(l2.get(EXEC_TIME_INDEX));
@@ -99,4 +109,16 @@ public class CurrentQueryStatisticsProcDir implements 
ProcDirInterface {
         result.setRows(sortedRowData);
         return result;
     }
+
+    /**
+     * Format task progress as a percentage string with one decimal place.
+     * Visible for testing.
+     */
+    static String formatProgress(long total, long finished) {
+        if (total > 0) {
+            double pct = (double) finished * 100 / total;
+            return String.format("%.1f%%", pct);
+        }
+        return "0.0%";
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
index 42010ccbd20..a1f54901bde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
@@ -49,7 +49,7 @@ public final class ProcService {
         root.register("trash", new TrashProcDir());
         root.register("monitor", new MonitorProcDir());
         root.register("current_queries", new CurrentQueryStatisticsProcDir());
-        root.register("current_query_stmts", new 
CurrentQueryStatementsProcNode());
+        root.register("current_query_stmts", new 
CurrentQueryStatisticsProcDir());
         root.register("current_backend_instances", new 
CurrentQueryBackendInstanceProcDir());
         root.register("cluster_balance", new ClusterBalanceProcDir());
         root.register("cluster_health", new ClusterHealthProcDir());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
index 2f499a8dd4c..cb55f350f07 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
@@ -55,11 +55,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-/**
- * It is accessed by two kinds of thread, one is to create this RuntimeProfile
- * , named 'query thread', the other is to call
- * {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}.
- */
 public class RuntimeProfile {
     // TODO: 这里维护性太差了
     // BE 上的 OperatorXBase::init 里面有 Operator 的命名规则
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 5480858efbf..e619a81326e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -23,7 +23,7 @@ import org.apache.doris.common.AuthenticationException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Status;
-import org.apache.doris.common.proc.CurrentQueryStatementsProcNode;
+import org.apache.doris.common.proc.CurrentQueryStatisticsProcDir;
 import org.apache.doris.common.proc.ProcResult;
 import org.apache.doris.common.profile.ProfileManager;
 import org.apache.doris.common.profile.ProfileManager.ProfileElement;
@@ -450,7 +450,7 @@ public class QueryProfileAction extends RestBaseController {
     }
 
     /**
-     * return the result of CurrentQueryStatementsProcNode.
+    * return the result of CurrentQueryStatisticsProcDir.
      *
      * @param request
      * @param response
@@ -480,15 +480,15 @@ public class QueryProfileAction extends 
RestBaseController {
                     LOG.warn("parse query info error: {}", data, e);
                 }
             }
-            List<String> titles = 
Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
+            List<String> titles = 
Lists.newArrayList(CurrentQueryStatisticsProcDir.TITLE_NAMES);
             titles.add(0, FRONTEND);
             return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(titles, 
queries));
         } else {
             try {
-                CurrentQueryStatementsProcNode node = new 
CurrentQueryStatementsProcNode();
+                CurrentQueryStatisticsProcDir node = new 
CurrentQueryStatisticsProcDir();
                 ProcResult result = node.fetchResult();
                 // add frontend info at first column.
-                List<String> titles = 
Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
+                List<String> titles = 
Lists.newArrayList(CurrentQueryStatisticsProcDir.TITLE_NAMES);
                 titles.add(0, FRONTEND);
                 List<List<String>> rows = result.getRows();
                 String feIp = FrontendOptions.getLocalHostAddress();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 275dc234706..ff023aeb939 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -29,6 +29,7 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TQueryProfile;
+import org.apache.doris.thrift.TQueryStatistics;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
 import org.apache.doris.thrift.TStatus;
@@ -212,6 +213,8 @@ public final class QeProcessorImpl implements QeProcessor {
     @Override
     public Map<String, QueryStatisticsItem> getQueryStatistics() {
         final Map<String, QueryStatisticsItem> querySet = Maps.newHashMap();
+        final Map<String, TQueryStatistics> queryStatisticsMap =
+                
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().getQueryStatisticsMap();
         for (Map.Entry<TUniqueId, QueryInfo> entry : 
coordinatorMap.entrySet()) {
             final QueryInfo info = entry.getValue();
             final ConnectContext context = info.getConnectContext();
@@ -219,12 +222,14 @@ public final class QeProcessorImpl implements QeProcessor 
{
                 continue;
             }
             final String queryIdStr = 
DebugUtil.printId(info.getConnectContext().queryId());
+            final TQueryStatistics queryStatistics = 
queryStatisticsMap.get(queryIdStr);
             final QueryStatisticsItem item = new 
QueryStatisticsItem.Builder().queryId(queryIdStr)
                     
.queryStartTime(info.getStartExecTime()).sql(info.getSql()).user(context.getQualifiedUser())
                     
.connId(String.valueOf(context.getConnectionId())).db(context.getDatabase())
                     .catalog(context.getDefaultCatalog())
                     
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
                     .profile(info.getCoord().getExecutionProfile().getRoot())
+                    .queryStatistics(queryStatistics)
                     
.isReportSucc(context.getSessionVariable().enableProfile()).build();
             querySet.put(queryIdStr, item);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
index c51ff24ca14..f879903e65a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.common.profile.RuntimeProfile;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryStatistics;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.Lists;
@@ -38,6 +39,8 @@ public final class QueryStatisticsItem {
     // root query profile
     private final RuntimeProfile queryProfile;
     private final boolean isReportSucc;
+    // query statistics same as statistics in audit log
+    private final TQueryStatistics queryStatistics;
 
     private QueryStatisticsItem(Builder builder) {
         this.queryId = builder.queryId;
@@ -50,6 +53,7 @@ public final class QueryStatisticsItem {
         this.fragmentInstanceInfos = builder.fragmentInstanceInfos;
         this.queryProfile = builder.queryProfile;
         this.isReportSucc = builder.isReportSucc;
+        this.queryStatistics = builder.queryStatistics;
     }
 
     public String getDb() {
@@ -97,6 +101,10 @@ public final class QueryStatisticsItem {
         return isReportSucc;
     }
 
+    public TQueryStatistics getQueryStatistics() {
+        return queryStatistics;
+    }
+
     public static final class Builder {
         private String queryId;
         private String catalog;
@@ -108,6 +116,7 @@ public final class QueryStatisticsItem {
         private List<FragmentInstanceInfo> fragmentInstanceInfos;
         private RuntimeProfile queryProfile;
         private boolean isReportSucc;
+        private TQueryStatistics queryStatistics;
 
         public Builder() {
             fragmentInstanceInfos = Lists.newArrayList();
@@ -163,6 +172,11 @@ public final class QueryStatisticsItem {
             return this;
         }
 
+        public Builder queryStatistics(TQueryStatistics queryStatistics) {
+            this.queryStatistics = queryStatistics;
+            return this;
+        }
+
         public QueryStatisticsItem build() {
             initDefaultValue(this);
             return new QueryStatisticsItem(this);
@@ -192,6 +206,10 @@ public final class QueryStatisticsItem {
             if (queryProfile == null) {
                 queryProfile = new RuntimeProfile("");
             }
+
+            if (queryStatistics == null) {
+                queryStatistics = new TQueryStatistics();
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index cc31e7858e0..7920b1e59a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -20,12 +20,15 @@ package org.apache.doris.resource.workloadschedpolicy;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.thrift.TQueryStatistics;
 import org.apache.doris.thrift.TQueryStatisticsResult;
 import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
@@ -48,7 +51,10 @@ import java.util.concurrent.locks.ReentrantLock;
 public class WorkloadRuntimeStatusMgr extends MasterDaemon {
 
     private static final Logger LOG = 
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
+    // backend id --> {query id --> (query last report time, query stats)}
     private Map<Long, BeReportInfo> beToQueryStatsMap = 
Maps.newConcurrentMap();
+    // Publish an immutable snapshot for synchronous proc/REST readers.
+    private volatile Map<String, TQueryStatistics> queryStatisticsSnapshot = 
ImmutableMap.of();
     private final ReentrantLock queryAuditEventLock = new ReentrantLock();
     private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
     private volatile long lastWarnTime;
@@ -60,6 +66,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
             this.beLastReportTime = beLastReportTime;
         }
 
+        // query id --> (query last report time, query stats)
         Map<String, Pair<Long, TQueryStatisticsResult>> queryStatsMap = 
Maps.newConcurrentMap();
     }
 
@@ -69,10 +76,12 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        // 1 merge be query statistics
+        // 1 rebuild and publish query statistics snapshot
+        rebuildQueryStatisticsSnapshot();
+        // 2 read the latest immutable snapshot for downstream processing
         Map<String, TQueryStatistics> queryStatisticsMap = 
getQueryStatisticsMap();
 
-        // 2 log query audit
+        // 3 log query audit
         try {
             List<AuditEvent> auditEventList = getQueryNeedAudit();
             int missedLogCount = 0;
@@ -106,10 +115,17 @@ public class WorkloadRuntimeStatusMgr extends 
MasterDaemon {
             LOG.warn("exception happens when handleAuditEvent, ", t);
         }
 
-        // 3 clear beToQueryStatsMap when be report timeout
+        // 4 clear beToQueryStatsMap when be report timeout
         clearReportTimeoutBeStatistics();
     }
 
+    // After the query or insert finished, FE will not audit immediately, it 
will send an audit
+    // event to this queue. And the worker thread will handle it. If the queue 
is full, the event
+    // will be handled immediately and may miss some statistic info. So the 
statistic info of audit
+    // event may be not accurate, but it can avoid the case that FE OOM 
because of too many audit
+    // events in queue when QPS is high. The event will be logged directly if 
the queue is full.
+    // And the worker thread will get an event from the queue and get the 
statistic info for this
+    // event from queryStatisticsMap.
     public void submitFinishQueryToAudit(AuditEvent event) {
         queryAuditEventLogWriteLock();
         try {
@@ -121,9 +137,9 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
                     // if queryAuditEventList is full, we don't put the event 
to queryAuditEventList.
                     // so that the statistic info of this audit event will be 
ignored,
                     // and event will be logged directly.
-                    LOG.warn("audit log event queue size {} is full, this may 
cause audit log missing statistics."
-                                    + "you can check whether qps is too high 
or "
-                                    + "set audit_event_log_queue_size to a 
larger value in fe.conf. query id: {}",
+                    LOG.warn("audit log event queue size {} is full, this may 
cause audit log missing "
+                            + "statistics. you can check whether qps is too 
high or set "
+                            + "audit_event_log_queue_size to a larger value in 
fe.conf. query id: {}",
                             queryAuditEventList.size(), event.queryId);
                 }
                 Env.getCurrentAuditEventProcessor().handleAuditEvent(event);
@@ -186,7 +202,7 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
         }
     }
 
-    void clearReportTimeoutBeStatistics() {
+    private void clearReportTimeoutBeStatistics() {
         // 1 clear report timeout be
         Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
         Long currentTime = System.currentTimeMillis();
@@ -200,24 +216,59 @@ public class WorkloadRuntimeStatusMgr extends 
MasterDaemon {
             for (String queryId : queryIdSet) {
                 Pair<Long, TQueryStatisticsResult> pair = 
beReportInfo.queryStatsMap.get(queryId);
                 long queryLastReportTime = pair.first;
-                if (currentTime - queryLastReportTime > 
Config.be_report_query_statistics_timeout_ms) {
+                boolean timeout = currentTime - queryLastReportTime
+                        > Config.be_report_query_statistics_timeout_ms;
+                // Remove query statistics only when both conditions are 
satisfied:
+                // 1) this query statistics is timeout, and
+                // 2) FE no longer has this query in QeProcessorImpl.
+                // Example timeline:
+                // - t0: query q1 is still running, but one periodic BE report 
is delayed for > timeout.
+                // - t1: clear thread runs. timeout condition is true, but q1 
still exists in FE.
+                // - t2: we keep q1 statistics instead of removing it; later 
reports can update it again.
+                if (timeout && isQueryNotExistInFe(queryId)) {
                     beReportInfo.queryStatsMap.remove(queryId);
                 }
             }
         }
     }
 
-    // NOTE: currently getQueryStatisticsMap must be called before clear 
beToQueryStatsMap
-    // so there is no need lock or null check when visit beToQueryStatsMap
+    private boolean isQueryNotExistInFe(String queryId) {
+        try {
+            return 
QeProcessorImpl.INSTANCE.getCoordinator(DebugUtil.parseTUniqueIdFromString(queryId))
 == null;
+        } catch (NumberFormatException e) {
+            return true;
+        }
+    }
+
+    // Rebuild query statistics from concurrent runtime maps and publish an 
immutable snapshot.
+    // This method is intentionally called by daemon thread and unit tests 
only.
+    void rebuildQueryStatisticsSnapshot() {
+        queryStatisticsSnapshot = 
ImmutableMap.copyOf(buildQueryStatisticsMapUnsafe());
+    }
+
+    // Return the latest published snapshot for synchronous readers such as 
proc/REST paths.
     public Map<String, TQueryStatistics> getQueryStatisticsMap() {
+        return queryStatisticsSnapshot;
+    }
+
+    // Build a merged map by traversing concurrent runtime structures.
+    private Map<String, TQueryStatistics> buildQueryStatisticsMapUnsafe() {
         // 1 merge query stats in all be
         Set<Long> beIdSet = beToQueryStatsMap.keySet();
         Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
         for (Long beId : beIdSet) {
             BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+            if (beReportInfo == null) {
+                continue;
+            }
             Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
             for (String queryId : queryIdSet) {
-                TQueryStatisticsResult curQueryStats = 
beReportInfo.queryStatsMap.get(queryId).second;
+                Pair<Long, TQueryStatisticsResult> queryStatsPair =
+                        beReportInfo.queryStatsMap.get(queryId);
+                if (queryStatsPair == null || queryStatsPair.second == null) {
+                    continue;
+                }
+                TQueryStatisticsResult curQueryStats = queryStatsPair.second;
 
                 TQueryStatistics retQuery = resultQueryMap.get(queryId);
                 if (retQuery == null) {
@@ -248,18 +299,37 @@ public class WorkloadRuntimeStatusMgr extends 
MasterDaemon {
         if (srcStats == null) {
             return;
         }
-        dst.scan_rows += srcStats.scan_rows;
-        dst.scan_bytes += srcStats.scan_bytes;
-        dst.scan_bytes_from_local_storage += 
srcStats.scan_bytes_from_local_storage;
-        dst.scan_bytes_from_remote_storage += 
srcStats.scan_bytes_from_remote_storage;
-        dst.cpu_ms += srcStats.cpu_ms;
-        dst.shuffle_send_bytes += srcStats.shuffle_send_bytes;
-        dst.shuffle_send_rows += srcStats.shuffle_send_rows;
+        dst.setScanRows(dst.scan_rows + srcStats.scan_rows);
+        dst.setScanBytes(dst.scan_bytes + srcStats.scan_bytes);
+        dst.setScanBytesFromLocalStorage(dst.scan_bytes_from_local_storage
+                + srcStats.scan_bytes_from_local_storage);
+        dst.setScanBytesFromRemoteStorage(dst.scan_bytes_from_remote_storage
+                + srcStats.scan_bytes_from_remote_storage);
+        dst.setCpuMs(dst.cpu_ms + srcStats.cpu_ms);
+        dst.setShuffleSendBytes(dst.shuffle_send_bytes + 
srcStats.shuffle_send_bytes);
+        dst.setShuffleSendRows(dst.shuffle_send_rows + 
srcStats.shuffle_send_rows);
+        dst.setProcessRows(dst.process_rows + srcStats.process_rows);
+        dst.setReturnedRows(dst.returned_rows + srcStats.returned_rows);
+        if (srcStats.isSetTotalTasksNum()) {
+            dst.setTotalTasksNum(dst.total_tasks_num + 
srcStats.total_tasks_num);
+        }
+        if (srcStats.isSetFinishedTasksNum()) {
+            dst.setFinishedTasksNum(dst.finished_tasks_num + 
srcStats.finished_tasks_num);
+        }
+        if (dst.current_used_memory_bytes < 
srcStats.current_used_memory_bytes) {
+            dst.setCurrentUsedMemoryBytes(srcStats.current_used_memory_bytes);
+        }
+        if (dst.workload_group_id <= 0 && srcStats.workload_group_id > 0) {
+            dst.setWorkloadGroupId(srcStats.workload_group_id);
+        }
         if (dst.max_peak_memory_bytes < srcStats.max_peak_memory_bytes) {
-            dst.max_peak_memory_bytes = srcStats.max_peak_memory_bytes;
+            dst.setMaxPeakMemoryBytes(srcStats.max_peak_memory_bytes);
         }
-        dst.spill_write_bytes_to_local_storage += 
srcStats.spill_write_bytes_to_local_storage;
-        dst.spill_read_bytes_from_local_storage += 
srcStats.spill_read_bytes_from_local_storage;
+        
dst.setSpillWriteBytesToLocalStorage(dst.spill_write_bytes_to_local_storage
+                + srcStats.spill_write_bytes_to_local_storage);
+        
dst.setSpillReadBytesFromLocalStorage(dst.spill_read_bytes_from_local_storage
+                + srcStats.spill_read_bytes_from_local_storage);
+        dst.setBytesWriteIntoCache(dst.bytes_write_into_cache + 
srcStats.bytes_write_into_cache);
     }
 
     private void queryAuditEventLogWriteLock() {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java
new file mode 100644
index 00000000000..0c9accd89e5
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for CurrentQueryStatisticsProcDir progress formatting.
+ * Tests the formatProgress method directly — no mocking or reflection 
required.
+ */
+public class CurrentQueryStatisticsProcDirTest {
+
+    @Test
+    public void testProgressNormal() {
+        // 7 out of 20 tasks finished = 35.0%
+        Assert.assertEquals("35.0%", 
CurrentQueryStatisticsProcDir.formatProgress(20, 7));
+    }
+
+    @Test
+    public void testProgressAllFinished() {
+        // 8 out of 8 = 100.0%
+        Assert.assertEquals("100.0%", 
CurrentQueryStatisticsProcDir.formatProgress(8, 8));
+    }
+
+    @Test
+    public void testProgressOneThird() {
+        // 1 out of 3 = 33.3%
+        Assert.assertEquals("33.3%", 
CurrentQueryStatisticsProcDir.formatProgress(3, 1));
+    }
+
+    @Test
+    public void testProgressTwoThirds() {
+        // 2 out of 3 = 66.7%
+        Assert.assertEquals("66.7%", 
CurrentQueryStatisticsProcDir.formatProgress(3, 2));
+    }
+
+    @Test
+    public void testProgressZeroPercent() {
+        // 0 out of 5 = 0.0%
+        Assert.assertEquals("0.0%", 
CurrentQueryStatisticsProcDir.formatProgress(5, 0));
+    }
+
+    @Test
+    public void testProgressZeroTotal() {
+        // total = 0, finished = 0 → "0.0%" (no division by zero)
+        Assert.assertEquals("0.0%", 
CurrentQueryStatisticsProcDir.formatProgress(0, 0));
+    }
+
+    @Test
+    public void testProgressFinishedExceedsTotal() {
+        // Defensive: if finished > total, still returns a percentage (may 
exceed 100%)
+        Assert.assertEquals("200.0%", 
CurrentQueryStatisticsProcDir.formatProgress(5, 10));
+    }
+
+    @Test
+    public void testProgressNegativeTotal() {
+        // total < 0 → returns "0.0%"
+        Assert.assertEquals("0.0%", 
CurrentQueryStatisticsProcDir.formatProgress(-1, 5));
+    }
+
+    @Test
+    public void testProgressLargeValues() {
+        // Verify no overflow with large numbers
+        Assert.assertEquals("50.0%",
+                
CurrentQueryStatisticsProcDir.formatProgress(Integer.MAX_VALUE, 
Integer.MAX_VALUE / 2));
+    }
+
+    @Test
+    public void testProgressFractional() {
+        // 1 out of 7 = 14.3% (14.2857... rounds to 14.3)
+        Assert.assertEquals("14.3%", 
CurrentQueryStatisticsProcDir.formatProgress(7, 1));
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java
new file mode 100644
index 00000000000..ae17dcc430a
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.workloadschedpolicy;
+
+import org.apache.doris.thrift.TQueryStatistics;
+import org.apache.doris.thrift.TQueryStatisticsResult;
+import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
+
+import com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * Unit test for WorkloadRuntimeStatusMgr.
+ * Verifies that query progress statistics from multiple BEs are correctly
+ * merged via the Thrift setter-based merge logic.
+ */
+public class WorkloadRuntimeStatusMgrTest {
+
+    private WorkloadRuntimeStatusMgr mgr;
+
+    @Before
+    public void setUp() {
+        mgr = new WorkloadRuntimeStatusMgr();
+    }
+
+    // ---- Merge: single BE ----
+
+    @Test
+    public void testSingleBeProgressMerge() {
+        // Simulate one BE reporting progress for one query.
+        long beId = 10001L;
+        TQueryStatistics stats = new TQueryStatistics();
+        stats.setTotalTasksNum(10);
+        stats.setFinishedTasksNum(3);
+
+        TReportWorkloadRuntimeStatusParams params = buildParams(beId, "q1", 
stats);
+        mgr.updateBeQueryStats(params);
+
+        Map<String, TQueryStatistics> merged = getMergedSnapshot();
+        Assert.assertEquals(1, merged.size());
+
+        TQueryStatistics result = merged.get("q1");
+        Assert.assertNotNull(result);
+        Assert.assertEquals(10, result.getTotalTasksNum());
+        Assert.assertEquals(3, result.getFinishedTasksNum());
+    }
+
+    // ---- Merge: multiple BEs, same query (summing across BEs) ----
+
+    @Test
+    public void testMultiBeSummingAcrossQuery() {
+        // BE1: total=10, finished=3
+        // BE2: total=8,  finished=5
+        // Merged: total=18, finished=8
+        mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 3)));
+        mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(8, 5)));
+
+        Map<String, TQueryStatistics> merged = getMergedSnapshot();
+        Assert.assertEquals(1, merged.size());
+
+        TQueryStatistics result = merged.get("q1");
+        Assert.assertEquals(18, result.getTotalTasksNum());
+        Assert.assertEquals(8, result.getFinishedTasksNum());
+    }
+
+    // ---- Merge: multiple BEs, multiple queries remain independent ----
+
+    @Test
+    public void testMultiQueryIndependence() {
+        mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 2)));
+        mgr.updateBeQueryStats(buildParams(10001L, "q2", buildStats(20, 15)));
+
+        Map<String, TQueryStatistics> merged = getMergedSnapshot();
+        Assert.assertEquals(2, merged.size());
+
+        Assert.assertEquals(10, merged.get("q1").getTotalTasksNum());
+        Assert.assertEquals(2, merged.get("q1").getFinishedTasksNum());
+        Assert.assertEquals(20, merged.get("q2").getTotalTasksNum());
+        Assert.assertEquals(15, merged.get("q2").getFinishedTasksNum());
+    }
+
+    // ---- isSet flag: unset fields should not override previous values ----
+
+    @Test
+    public void testIsSetPreservesPreviousValues() {
+        // BE1 reports total=10, finished=3 with isSet properly set
+        mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 3)));
+
+        // BE2 reports same query but does NOT set total_tasks_num
+        // This simulates an older BE version without progress support.
+        TQueryStatistics stats2 = new TQueryStatistics();
+        // Intentionally NOT calling setTotalTasksNum/setFinishedTasksNum
+        // (isSet* returns false)
+        stats2.setScanRows(100);  // set some other field to make it non-empty
+        mgr.updateBeQueryStats(buildParams(10002L, "q1", stats2));
+
+        Map<String, TQueryStatistics> merged = getMergedSnapshot();
+        TQueryStatistics result = merged.get("q1");
+
+        // BE2 didn't set total/finished, so original values from BE1 should 
be preserved
+        Assert.assertEquals(10, result.getTotalTasksNum());
+        Assert.assertEquals(3, result.getFinishedTasksNum());
+    }
+
+    // ---- Zero-reporting BE should not interfere ----
+
+    @Test
+    public void testBeWithZeroProgress() {
+        mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 4)));
+
+        // BE2 reports zero progress correctly
+        mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(0, 0)));
+
+        Map<String, TQueryStatistics> merged = getMergedSnapshot();
+        TQueryStatistics result = merged.get("q1");
+
+        // total=10, finished=4 (from BE1); BE2's (0,0) is additive → still 
(10,4)
+        Assert.assertEquals(10, result.getTotalTasksNum());
+        Assert.assertEquals(4, result.getFinishedTasksNum());
+    }
+
+    // ---- getQueryStatistics returns per-BE map ----
+
+    @Test
+    public void testGetQueryStatisticsPerBe() {
+        mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(5, 2)));
+        mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(3, 1)));
+
+        Map<Long, TQueryStatisticsResult> perBe = mgr.getQueryStatistics("q1");
+        Assert.assertEquals(2, perBe.size());
+        Assert.assertTrue(perBe.containsKey(10001L));
+        Assert.assertTrue(perBe.containsKey(10002L));
+        Assert.assertEquals(5, 
perBe.get(10001L).getStatistics().getTotalTasksNum());
+        Assert.assertEquals(3, 
perBe.get(10002L).getStatistics().getTotalTasksNum());
+    }
+
+    // ---- Non-existent query returns empty map ----
+
+    @Test
+    public void testGetQueryStatisticsNonExistent() {
+        Map<Long, TQueryStatisticsResult> perBe = 
mgr.getQueryStatistics("non-existent-query");
+        Assert.assertTrue(perBe.isEmpty());
+    }
+
+    // ---- updateBeQueryStats with missing fields ----
+
+    @Test
+    public void testUpdateBeQueryStatsMissingBackendId() {
+        TReportWorkloadRuntimeStatusParams params = new 
TReportWorkloadRuntimeStatusParams();
+        // backend_id not set, updateBeQueryStats should log a warning and 
return early
+        mgr.updateBeQueryStats(params);
+        Assert.assertTrue(getMergedSnapshot().isEmpty());
+    }
+
+    // ---- updateBeQueryStats with missing query stats map ----
+
+    @Test
+    public void testUpdateBeQueryStatsMissingQueryStatsMap() {
+        TReportWorkloadRuntimeStatusParams params = new 
TReportWorkloadRuntimeStatusParams();
+        params.setBackendId(10001L);
+        // query_statistics_result_map not set → should return early
+        mgr.updateBeQueryStats(params);
+        Assert.assertTrue(getMergedSnapshot().isEmpty());
+    }
+
+    // ---- isSet flag: verifying Thrift setter behavior inline ----
+
+    @Test
+    public void testThriftIsSetFlagRequired() {
+        // Confirm that using setter (via __set*) sets the __isset flag,
+        // whereas direct field assignment does not.
+        // This documents the historical bug that was fixed in this feature.
+
+        TQueryStatistics viaSetter = new TQueryStatistics();
+        viaSetter.setTotalTasksNum(5);
+        Assert.assertTrue("setTotalTasksNum via setter must set __isset flag",
+                viaSetter.isSetTotalTasksNum());
+
+        TQueryStatistics viaField = new TQueryStatistics();
+        viaField.total_tasks_num = 5;  // direct field assignment
+        Assert.assertFalse("direct field assignment must NOT set __isset flag",
+                viaField.isSetTotalTasksNum());
+
+        // Same for finished_tasks_num
+        viaSetter.setFinishedTasksNum(3);
+        Assert.assertTrue(viaSetter.isSetFinishedTasksNum());
+
+        viaField.finished_tasks_num = 3;
+        Assert.assertFalse(viaField.isSetFinishedTasksNum());
+    }
+
+    // ---- Merge without any progress fields ----
+
+    @Test
+    public void testMergeWithoutProgressFields() {
+        // Regression test: when isSet is false for progress fields,
+        // merge should not touch them, leaving them at default (0).
+        TQueryStatistics stats = new TQueryStatistics();
+        // Intentionally leave total/finished unset
+        mgr.updateBeQueryStats(buildParams(10001L, "q1", stats));
+
+        Map<String, TQueryStatistics> merged = getMergedSnapshot();
+        TQueryStatistics result = merged.get("q1");
+
+        // Fields should still be 0 and isSet should be false
+        Assert.assertEquals(0, result.getTotalTasksNum());
+        Assert.assertEquals(0, result.getFinishedTasksNum());
+    }
+
+    // ---- Merge: three BEs combined ----
+
+    @Test
+    public void testThreeBeMergeProgress() {
+        mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(4, 1)));
+        mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(3, 3)));
+        mgr.updateBeQueryStats(buildParams(10003L, "q1", buildStats(5, 0)));
+
+        Map<String, TQueryStatistics> merged = getMergedSnapshot();
+        Assert.assertEquals(1, merged.size());
+
+        TQueryStatistics result = merged.get("q1");
+        // total = 4 + 3 + 5 = 12, finished = 1 + 3 + 0 = 4
+        Assert.assertEquals(12, result.getTotalTasksNum());
+        Assert.assertEquals(4, result.getFinishedTasksNum());
+    }
+
+    @Test
+    public void testSnapshotReadRequiresRebuild() {
+        mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(6, 2)));
+        // Newly reported data is not visible to sync readers before snapshot 
rebuild.
+        Assert.assertTrue(mgr.getQueryStatisticsMap().isEmpty());
+
+        // Rebuild snapshot and verify the new data becomes visible.
+        Map<String, TQueryStatistics> merged = getMergedSnapshot();
+        Assert.assertEquals(1, merged.size());
+        Assert.assertEquals(6, merged.get("q1").getTotalTasksNum());
+        Assert.assertEquals(2, merged.get("q1").getFinishedTasksNum());
+    }
+
+    // ---- helper methods ----
+
+    private TQueryStatistics buildStats(int totalTasks, int finishedTasks) {
+        TQueryStatistics stats = new TQueryStatistics();
+        stats.setTotalTasksNum(totalTasks);
+        stats.setFinishedTasksNum(finishedTasks);
+        return stats;
+    }
+
+    private TReportWorkloadRuntimeStatusParams buildParams(long beId, String 
queryId, TQueryStatistics stats) {
+        TQueryStatisticsResult result = new TQueryStatisticsResult();
+        result.setStatistics(stats);
+
+        TReportWorkloadRuntimeStatusParams params = new 
TReportWorkloadRuntimeStatusParams();
+        params.setBackendId(beId);
+        Map<String, TQueryStatisticsResult> map = Maps.newHashMap();
+        map.put(queryId, result);
+        params.setQueryStatisticsResultMap(map);
+        return params;
+    }
+
+    // Refresh and read snapshot to match daemon-driven visibility semantics.
+    private Map<String, TQueryStatistics> getMergedSnapshot() {
+        mgr.rebuildQueryStatisticsSnapshot();
+        return mgr.getQueryStatisticsMap();
+    }
+}
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 3b1c55fc295..52246428345 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -202,6 +202,9 @@ struct TQueryStatistics {
     12: optional i64 spill_write_bytes_to_local_storage
     13: optional i64 spill_read_bytes_from_local_storage
     14: optional i64 bytes_write_into_cache
+    15: optional i64 process_rows
+    16: optional i32 finished_tasks_num
+    17: optional i32 total_tasks_num
 }
 
 struct TQueryStatisticsResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to