This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new e01d7c87679 branch-4.1: [fix](memtable) use CPU metrics delta for CPU 
pressure detection in adaptive flush controller #62744 (#62943)
e01d7c87679 is described below

commit e01d7c87679b4145fd2fc8e52ca8e782e75e374e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 13 17:06:18 2026 +0800

    branch-4.1: [fix](memtable) use CPU metrics delta for CPU pressure 
detection in adaptive flush controller #62744 (#62943)
    
    Cherry-picked from #62744
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/common/metrics/system_metrics.cpp           | 25 ++++++++
 be/src/common/metrics/system_metrics.h             | 11 ++++
 be/src/storage/adaptive_thread_pool_controller.cpp | 35 +++++++++--
 be/src/storage/adaptive_thread_pool_controller.h   |  9 ++-
 .../adaptive_thread_pool_controller_test.cpp       | 69 ++++++++++++++++++++++
 be/test/util/test_data/stat_cpu_busy_after         |  1 +
 be/test/util/test_data/stat_cpu_busy_before        |  1 +
 be/test/util/test_data/stat_cpu_iowait_after       |  1 +
 be/test/util/test_data/stat_cpu_recovery_after     |  1 +
 be/test/util/test_data/stat_cpu_regressed_after    |  1 +
 10 files changed, 148 insertions(+), 6 deletions(-)

diff --git a/be/src/common/metrics/system_metrics.cpp 
b/be/src/common/metrics/system_metrics.cpp
index f111c04b491..da88b7b8094 100644
--- a/be/src/common/metrics/system_metrics.cpp
+++ b/be/src/common/metrics/system_metrics.cpp
@@ -468,6 +468,17 @@ void SystemMetrics::_update_cpu_metrics() {
             continue;
         }
 
+        if (cpu_name == "cpu") {
+            AggregateCpuTime aggregate_cpu_time;
+            aggregate_cpu_time.total_time = values[0] + values[1] + values[2] 
+ values[3] +
+                                            values[4] + values[5] + values[6] 
+ values[7];
+            aggregate_cpu_time.idle_time = values[3] + values[4];
+            aggregate_cpu_time.initialized = aggregate_cpu_time.total_time > 0;
+            // Publish a consistent aggregate snapshot derived from one 
/proc/stat row.
+            std::lock_guard<std::mutex> lk(_aggregate_cpu_time_mutex);
+            _aggregate_cpu_time = aggregate_cpu_time;
+        }
+
         for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) {
             it->second->metrics[i]->set_value(values[i]);
         }
@@ -909,6 +920,20 @@ double SystemMetrics::get_load_average_1_min() {
     }
 }
 
+bool SystemMetrics::get_aggregate_cpu_time(int64_t* total_time, int64_t* 
idle_time) const {
+    DCHECK(total_time != nullptr);
+    DCHECK(idle_time != nullptr);
+
+    std::lock_guard<std::mutex> lk(_aggregate_cpu_time_mutex);
+    if (!_aggregate_cpu_time.initialized) {
+        return false;
+    }
+
+    *total_time = _aggregate_cpu_time.total_time;
+    *idle_time = _aggregate_cpu_time.idle_time;
+    return true;
+}
+
 void SystemMetrics::get_network_traffic(std::map<std::string, int64_t>* 
send_map,
                                         std::map<std::string, int64_t>* 
rcv_map) {
     send_map->clear();
diff --git a/be/src/common/metrics/system_metrics.h 
b/be/src/common/metrics/system_metrics.h
index 52ca7299f70..a5b4ccceb7e 100644
--- a/be/src/common/metrics/system_metrics.h
+++ b/be/src/common/metrics/system_metrics.h
@@ -22,6 +22,7 @@
 
 #include <map>
 #include <memory>
+#include <mutex>
 #include <set>
 #include <string>
 #include <vector>
@@ -59,6 +60,7 @@ public:
                              int64_t interval_sec, int64_t* send_rate, 
int64_t* rcv_rate);
 
     double get_load_average_1_min();
+    bool get_aggregate_cpu_time(int64_t* total_time, int64_t* idle_time) const;
 
     void update_max_disk_io_util_percent(const std::map<std::string, int64_t>& 
lst_value,
                                          int64_t interval_sec);
@@ -99,6 +101,12 @@ private:
     void get_cpu_name();
 
 private:
+    struct AggregateCpuTime {
+        int64_t total_time = 0;
+        int64_t idle_time = 0;
+        bool initialized = false;
+    };
+
     static const char* _s_hook_name;
 
     std::map<std::string, CpuMetrics*> _cpu_metrics;
@@ -118,6 +126,9 @@ private:
     MetricRegistry* _registry = nullptr;
     std::shared_ptr<MetricEntity> _server_entity;
 
+    mutable std::mutex _aggregate_cpu_time_mutex;
+    AggregateCpuTime _aggregate_cpu_time;
+
     IntGauge* max_disk_io_util_percent = nullptr;
     IntGauge* max_network_send_bytes_rate = nullptr;
     IntGauge* max_network_receive_bytes_rate = nullptr;
diff --git a/be/src/storage/adaptive_thread_pool_controller.cpp 
b/be/src/storage/adaptive_thread_pool_controller.cpp
index 8b8de54654d..bcb7b089389 100644
--- a/be/src/storage/adaptive_thread_pool_controller.cpp
+++ b/be/src/storage/adaptive_thread_pool_controller.cpp
@@ -252,6 +252,8 @@ int AdaptiveThreadPoolController::get_current_threads(const 
std::string& name) c
 }
 
 bool AdaptiveThreadPoolController::is_io_busy() {
+    std::lock_guard<std::mutex> lk(_metrics_state_mutex);
+
     if (config::is_cloud_mode()) {
         if (_s3_file_upload_pool == nullptr) return false;
         int queue_size = _s3_file_upload_pool->get_queue_size();
@@ -275,14 +277,37 @@ bool AdaptiveThreadPoolController::is_io_busy() {
 }
 
 bool AdaptiveThreadPoolController::is_cpu_busy() {
+    std::lock_guard<std::mutex> lk(_metrics_state_mutex);
+
     if (_system_metrics == nullptr) return false;
 
-    double load_avg = _system_metrics->get_load_average_1_min();
-    int num_cpus = std::thread::hardware_concurrency();
-    if (num_cpus <= 0) return false;
+    int64_t total_time = 0;
+    int64_t idle_time = 0;
+    if (!_system_metrics->get_aggregate_cpu_time(&total_time, &idle_time)) {
+        return _last_cpu_busy;
+    }
+
+    if (_last_cpu_total_time < 0) {
+        _last_cpu_total_time = total_time;
+        _last_cpu_idle_time = idle_time;
+        _last_cpu_busy = false;
+        return false;
+    }
+
+    int64_t total_time_delta = total_time - _last_cpu_total_time;
+    int64_t idle_time_delta = idle_time - _last_cpu_idle_time;
+    if (total_time_delta <= 0 || idle_time_delta < 0 || idle_time_delta > 
total_time_delta) {
+        // Keep the previous baseline so a transient invalid sample does not
+        // poison the next interval's delta calculation.
+        return _last_cpu_busy;
+    }
 
-    double cpu_usage_percent = (load_avg / num_cpus) * 100.0;
-    return cpu_usage_percent > kCPUBusyThresholdPercent;
+    _last_cpu_total_time = total_time;
+    _last_cpu_idle_time = idle_time;
+    double cpu_busy_percent =
+            static_cast<double>(total_time_delta - idle_time_delta) * 100.0 / 
total_time_delta;
+    _last_cpu_busy = cpu_busy_percent > kCPUBusyThresholdPercent;
+    return _last_cpu_busy;
 }
 
 AdaptiveThreadPoolController::AdjustFunc 
AdaptiveThreadPoolController::make_flush_adjust_func(
diff --git a/be/src/storage/adaptive_thread_pool_controller.h 
b/be/src/storage/adaptive_thread_pool_controller.h
index 4a6f0096f62..0598531268c 100644
--- a/be/src/storage/adaptive_thread_pool_controller.h
+++ b/be/src/storage/adaptive_thread_pool_controller.h
@@ -56,7 +56,7 @@ struct TimerArg {
 };
 
 // AdaptiveThreadPoolController dynamically adjusts thread pool sizes based on
-// system load (IO utilisation, CPU load average, flush queue depth).
+// system load (IO utilisation, CPU utilisation, flush queue depth).
 //
 // Each registered pool group runs as a one-shot bthread_timer_add chain: the
 // callback fires, adjusts the pool, then re-registers the next one-shot timer.
@@ -142,6 +142,7 @@ private:
     ThreadPool* _s3_file_upload_pool = nullptr;
 
     mutable std::mutex _mutex;
+    mutable std::mutex _metrics_state_mutex;
     std::map<std::string, PoolGroup> _pool_groups;
 
     // Last successfully computed IO-busy result. Returned as-is when the
@@ -151,6 +152,12 @@ private:
     // For disk IO util calculation (used by is_io_busy).
     std::map<std::string, int64_t> _last_disk_io_time;
     int64_t _last_check_time_sec = 0;
+
+    // For CPU util calculation (used by is_cpu_busy). The counters come from
+    // SystemMetrics' existing cpu_* metrics and are compared as deltas.
+    bool _last_cpu_busy = false;
+    int64_t _last_cpu_total_time = -1;
+    int64_t _last_cpu_idle_time = -1;
 };
 
 } // namespace doris
diff --git a/be/test/storage/adaptive_thread_pool_controller_test.cpp 
b/be/test/storage/adaptive_thread_pool_controller_test.cpp
index b2d6464fcef..06d79629330 100644
--- a/be/test/storage/adaptive_thread_pool_controller_test.cpp
+++ b/be/test/storage/adaptive_thread_pool_controller_test.cpp
@@ -22,10 +22,23 @@
 #include <thread>
 
 #include "common/config.h"
+#include "common/metrics/metrics.h"
+#include "common/metrics/system_metrics.h"
+#include "testutil/test_util.h"
 #include "util/threadpool.h"
 
 namespace doris {
 
+extern const char* k_ut_stat_path;
+
+namespace {
+
+std::string get_stat_test_data_path(const std::string& file_name) {
+    return GetCurrentRunningDir() + "/util/test_data/" + file_name;
+}
+
+} // namespace
+
 class AdaptiveThreadPoolControllerTest : public testing::Test {
 protected:
     void SetUp() override {
@@ -242,6 +255,62 @@ TEST_F(AdaptiveThreadPoolControllerTest, 
TestIoBusyCpuBusyWithNullMetrics) {
     EXPECT_FALSE(controller.is_cpu_busy());
 }
 
+TEST_F(AdaptiveThreadPoolControllerTest, TestCpuBusyUsesCpuMetricsDelta) {
+    MetricRegistry registry("test");
+    const std::string before_path = 
get_stat_test_data_path("stat_cpu_busy_before");
+    const std::string after_path = 
get_stat_test_data_path("stat_cpu_busy_after");
+    k_ut_stat_path = before_path.c_str();
+    SystemMetrics metrics(&registry, {}, {});
+    metrics.update();
+
+    AdaptiveThreadPoolController controller;
+    controller.init(&metrics, nullptr);
+
+    EXPECT_FALSE(controller.is_cpu_busy());
+    k_ut_stat_path = after_path.c_str();
+    metrics.update();
+    EXPECT_TRUE(controller.is_cpu_busy());
+}
+
+TEST_F(AdaptiveThreadPoolControllerTest, TestCpuBusyTreatsIoWaitAsIdle) {
+    MetricRegistry registry("test");
+    const std::string before_path = 
get_stat_test_data_path("stat_cpu_busy_before");
+    const std::string after_path = 
get_stat_test_data_path("stat_cpu_iowait_after");
+    k_ut_stat_path = before_path.c_str();
+    SystemMetrics metrics(&registry, {}, {});
+    metrics.update();
+
+    AdaptiveThreadPoolController controller;
+    controller.init(&metrics, nullptr);
+
+    EXPECT_FALSE(controller.is_cpu_busy());
+    k_ut_stat_path = after_path.c_str();
+    metrics.update();
+    EXPECT_FALSE(controller.is_cpu_busy());
+}
+
+TEST_F(AdaptiveThreadPoolControllerTest, 
TestCpuBusyInvalidSampleDoesNotAdvanceBaseline) {
+    MetricRegistry registry("test");
+    const std::string before_path = 
get_stat_test_data_path("stat_cpu_busy_before");
+    const std::string invalid_path = 
get_stat_test_data_path("stat_cpu_regressed_after");
+    const std::string recovery_path = 
get_stat_test_data_path("stat_cpu_recovery_after");
+    k_ut_stat_path = before_path.c_str();
+    SystemMetrics metrics(&registry, {}, {});
+    metrics.update();
+
+    AdaptiveThreadPoolController controller;
+    controller.init(&metrics, nullptr);
+
+    EXPECT_FALSE(controller.is_cpu_busy());
+    k_ut_stat_path = invalid_path.c_str();
+    metrics.update();
+    EXPECT_FALSE(controller.is_cpu_busy());
+
+    k_ut_stat_path = recovery_path.c_str();
+    metrics.update();
+    EXPECT_TRUE(controller.is_cpu_busy());
+}
+
 // Test adjust function that uses controller's is_io_busy/is_cpu_busy
 TEST_F(AdaptiveThreadPoolControllerTest, TestAdjustFuncWithControllerMethods) {
     config::enable_adaptive_flush_threads = true;
diff --git a/be/test/util/test_data/stat_cpu_busy_after 
b/be/test/util/test_data/stat_cpu_busy_after
new file mode 100644
index 00000000000..2b5027df63d
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_busy_after
@@ -0,0 +1 @@
+cpu  250 0 200 820 0 0 0 0 0 0
diff --git a/be/test/util/test_data/stat_cpu_busy_before 
b/be/test/util/test_data/stat_cpu_busy_before
new file mode 100644
index 00000000000..bcf77b76c25
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_busy_before
@@ -0,0 +1 @@
+cpu  100 0 100 800 0 0 0 0 0 0
diff --git a/be/test/util/test_data/stat_cpu_iowait_after 
b/be/test/util/test_data/stat_cpu_iowait_after
new file mode 100644
index 00000000000..c58b33ea7ca
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_iowait_after
@@ -0,0 +1 @@
+cpu  120 0 110 810 260 0 0 0 0 0
diff --git a/be/test/util/test_data/stat_cpu_recovery_after 
b/be/test/util/test_data/stat_cpu_recovery_after
new file mode 100644
index 00000000000..3ecc01524b6
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_recovery_after
@@ -0,0 +1 @@
+cpu  200 0 200 819 0 0 0 0 0 0
diff --git a/be/test/util/test_data/stat_cpu_regressed_after 
b/be/test/util/test_data/stat_cpu_regressed_after
new file mode 100644
index 00000000000..111f0f30706
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_regressed_after
@@ -0,0 +1 @@
+cpu  90 0 90 770 0 0 0 0 0 0


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to