This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a8393eeba4163151030899eecf55ebc9365f02c1
Author: yiguolei <yiguo...@gmail.com>
AuthorDate: Fri Sep 27 23:36:20 2024 +0800

    cancel other query when memory not enough
---
 be/src/runtime/memory/memory_reclamation.cpp       |   2 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |   2 +-
 be/src/runtime/query_context.cpp                   |   2 +-
 be/src/runtime/query_context.h                     |   7 ++
 .../workload_group/workload_group_manager.cpp      | 115 ++++++++++++++++-----
 .../workload_group/workload_group_manager.h        |   2 +-
 6 files changed, 103 insertions(+), 27 deletions(-)

diff --git a/be/src/runtime/memory/memory_reclamation.cpp 
b/be/src/runtime/memory/memory_reclamation.cpp
index 17f5a41f462..4a4f73ee098 100644
--- a/be/src/runtime/memory/memory_reclamation.cpp
+++ b/be/src/runtime/memory/memory_reclamation.cpp
@@ -213,7 +213,7 @@ int64_t 
MemoryReclamation::tg_enable_overcommit_group_gc(int64_t request_free_me
     int64_t total_free_memory = 0;
     bool gc_all_exceeded = request_free_memory >= total_exceeded_memory;
     std::string log_prefix = fmt::format(
-            "work load group that enable overcommit, number of group: {}, 
request_free_memory:{}, "
+            "workload group that enable overcommit, number of group: {}, 
request_free_memory:{}, "
             "total_exceeded_memory:{}",
             task_groups.size(), request_free_memory, total_exceeded_memory);
     if (gc_all_exceeded) {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index c9f85258d5b..760fb992b3b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -290,7 +290,7 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
     // wg mgr will change wg's hard limit property.
     if (wg_ptr != nullptr && wg_ptr->enable_memory_overcommit() &&
         !wg_ptr->has_changed_to_hard_limit()) {
-        // TODO: Only do a check here, do not real reserve. If we could 
reserve it, it is better, but the logic is too complicated.
+        // Only do a check here, do not real reserve. If we could reserve it, 
it is better, but the logic is too complicated.
         if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
             return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(
                     "reserve memory failed, size: {}, because {}", size,
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index af4cd3d417d..abc31d967bd 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -472,7 +472,7 @@ Status QueryContext::revoke_memory() {
     // Do not use memlimit, use current memory usage.
     // For example, if current limit is 1.6G, but current used is 1G, if 
reserve failed
     // should free 200MB memory, not 300MB
-    const auto target_revoking_size = query_mem_tracker->consumption() * 0.2;
+    const int64_t target_revoking_size = 
(int64_t)(query_mem_tracker->consumption() * 0.2);
     size_t revoked_size = 0;
 
     std::vector<pipeline::PipelineTask*> chosen_tasks;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 299e4ced55c..6ce09b7dd0e 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -247,6 +247,12 @@ public:
 
     int64_t get_mem_limit() const { return query_mem_tracker->limit(); }
 
+    void set_expected_mem_limit(int64_t new_mem_limit) {
+        _expected_mem_limit = std::min<int64_t>(new_mem_limit, 
_user_set_mem_limit);
+    }
+
+    int64_t expected_mem_limit() { return _expected_mem_limit; }
+
     std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return 
query_mem_tracker; }
 
     int32_t get_slot_count() const {
@@ -411,6 +417,7 @@ private:
 
     std::atomic<bool> _low_memory_mode = false;
     int64_t _user_set_mem_limit = 0;
+    std::atomic<int64_t> _expected_mem_limit = 0;
 
     std::mutex _profile_mutex;
     timespec _query_arrival_timestamp;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 287a6b45729..95951b96bab 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -212,7 +212,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
             weighted_memory_limit_ratio);
     LOG_EVERY_T(INFO, 60) << debug_msg;
     for (auto& wg : _workload_groups) {
-        change_query_to_hard_limit(wg.second, false);
+        update_queries_limit(wg.second, false);
     }
 }
 
@@ -280,6 +280,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
 void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
     const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    std::vector<std::weak_ptr<QueryContext>> resume_after_gc;
     for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
         auto& queries_list = it->second;
         const auto& wg = it->first;
@@ -310,9 +311,9 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                 query_it = queries_list.erase(query_it);
                 continue;
             }
-
+            bool wg_changed_to_hard_limit = wg->has_changed_to_hard_limit();
             // Only deal with non overcommit workload group.
-            if (wg->enable_memory_overcommit() && 
!wg->has_changed_to_hard_limit() &&
+            if (wg->enable_memory_overcommit() && !wg_changed_to_hard_limit &&
                 
!query_ctx->paused_reason().is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
                 // Soft limit wg will only reserve failed when process limit 
exceed. But in some corner case,
                 // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
@@ -328,6 +329,9 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
             }
 
             if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                CHECK(!wg->enable_memory_overcommit() || 
wg_changed_to_hard_limit);
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
                 bool spill_res = handle_single_query(query_ctx, 
query_it->reserve_size_,
                                                      
query_ctx->paused_reason());
                 if (!spill_res) {
@@ -338,8 +342,23 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                     continue;
                 }
             } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                CHECK(!wg->enable_memory_overcommit() || 
wg_changed_to_hard_limit);
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->expected_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->expected_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << query_it->reserve_size_ << " is too large, 
set hard limit to "
+                              << query_ctx->expected_mem_limit() << " and 
resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
                 if (!has_changed_hard_limit) {
-                    change_query_to_hard_limit(wg, true);
+                    update_queries_limit(wg, true);
                     has_changed_hard_limit = true;
                     LOG(INFO) << "query: " << print_id(query_ctx->query_id())
                               << " reserve memory failed due to workload group 
memory exceed, "
@@ -412,15 +431,34 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                                  "to 0 now";
                 }
                 if (query_it->cache_ratio_ < 0.001) {
-                    // TODO: Find other exceed limit workload group and cancel 
query.
-                    bool spill_res = handle_single_query(query_ctx, 
query_it->reserve_size_,
-                                                         
query_ctx->paused_reason());
-                    if (!spill_res) {
-                        ++query_it;
-                        continue;
+                    if (query_it->any_wg_exceed_limit_) {
+                        if (wg->enable_memory_overcommit()) {
+                            if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
+                                resume_after_gc.push_back(query_ctx);
+                                query_it = queries_list.erase(query_it);
+                                continue;
+                            } else {
+                                ++query_it;
+                                continue;
+                            }
+                        } else {
+                            // current workload group is hard limit, should 
not wait other wg with
+                            // soft limit, just cancel
+                            resume_after_gc.push_back(query_ctx);
+                            query_it = queries_list.erase(query_it);
+                            continue;
+                        }
                     } else {
-                        query_it = queries_list.erase(query_it);
-                        continue;
+                        // TODO: Find other exceed limit workload group and 
cancel query.
+                        bool spill_res = handle_single_query(query_ctx, 
query_it->reserve_size_,
+                                                             
query_ctx->paused_reason());
+                        if (!spill_res) {
+                            ++query_it;
+                            continue;
+                        } else {
+                            query_it = queries_list.erase(query_it);
+                            continue;
+                        }
                     }
                 }
                 if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
@@ -439,8 +477,22 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
         // Finished deal with one workload group, and should deal with next 
one.
         ++it;
     }
+    // TODO minor GC to release some query
+    if (!resume_after_gc.empty()) {
+    }
+    for (auto resume_it = resume_after_gc.begin(); resume_it != 
resume_after_gc.end();
+         ++resume_it) {
+        auto query_ctx = resume_it->lock();
+        if (query_ctx != nullptr) {
+            query_ctx->set_memory_sufficient(true);
+        }
+    }
 }
 
+// streamload, kafka routine load, group commit
+// insert into select
+// select
+
 void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
     std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
     // If there is only one workload group and it is overcommit, then do 
nothing.
@@ -448,19 +500,24 @@ void 
WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
     if (_workload_groups.size() == 1) {
         return;
     }
-    if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(100 * 1024 * 
1024)) {
+    // soft_limit - 10%, will change workload group to hard limit.
+    // soft limit, process memory reserve failed.
+    // hard limit, FullGC will kill query randomly.
+    if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
+                (int64_t)(MemInfo::mem_limit() * 0.1))) {
         for (auto& [wg_id, wg] : _workload_groups) {
             if (wg->enable_memory_overcommit() && 
!wg->has_changed_to_hard_limit()) {
                 wg->change_to_hard_limit(true);
-                LOG(INFO) << "Process memory usage will exceed soft limit, 
change all workload "
+                LOG(INFO) << "Process memory usage + 10% will exceed soft 
limit, change all "
+                             "workload "
                              "group with overcommit to hard limit now. "
                           << wg->debug_string();
             }
         }
     }
-    // If current memory usage is below soft memlimit - 10%, then enable wg's 
overcommit
+    // If current memory usage is below soft memlimit - 15%, then enable wg's 
overcommit
     if (!doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
-                (int64_t)(MemInfo::mem_limit() * 0.1))) {
+                (int64_t)(MemInfo::mem_limit() * 0.15))) {
         for (auto& [wg_id, wg] : _workload_groups) {
             if (wg->enable_memory_overcommit() && 
wg->has_changed_to_hard_limit()) {
                 wg->change_to_hard_limit(false);
@@ -471,6 +528,7 @@ void 
WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
         }
     }
 }
+
 // If the query could release some memory, for example, spill disk, flush 
memtable then the return value is true.
 // If the query could not release memory, then cancel the query, the return 
value is true.
 // If the query is not ready to do these tasks, it means just wait.
@@ -488,6 +546,14 @@ bool 
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
         return false;
     }
 
+    // During waiting all task not running, may release some memory and the 
memory is enough now
+    // should resume the query.
+    if (query_ctx->get_mem_tracker()->limit() >
+        query_ctx->get_mem_tracker()->consumption() + size_to_reserve) {
+        query_ctx->set_memory_sufficient(true);
+        return true;
+    }
+
     auto revocable_tasks = query_ctx->get_revocable_tasks();
     if (revocable_tasks.empty()) {
         if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
@@ -518,7 +584,7 @@ bool 
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
     return true;
 }
 
-void WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool 
enable_hard_limit) {
+void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool 
enable_hard_limit) {
     auto wg_mem_limit = wg->memory_limit();
     auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 1);
     wg->set_weighted_memory_limit(wg_weighted_mem_limit);
@@ -578,6 +644,7 @@ void 
WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab
             continue;
         }
         int64_t query_weighted_mem_limit = 0;
+        int64_t expected_query_weighted_mem_limit = 0;
         // If the query enable hard limit, then it should not use the soft 
limit
         if (query_ctx->enable_query_slot_hard_limit()) {
             if (total_slot_count < 1) {
@@ -589,20 +656,21 @@ void 
WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab
                 query_weighted_mem_limit = 
(int64_t)((wg_high_water_mark_except_load *
                                                       
query_ctx->get_slot_count() * 1.0) /
                                                      total_slot_count);
+                expected_query_weighted_mem_limit = query_weighted_mem_limit;
             }
         } else {
             // If low water mark is not reached, then use process memory limit 
as query memory limit.
             // It means it will not take effect.
             // If there are some query in paused list, then limit should take 
effect.
+            expected_query_weighted_mem_limit =
+                    total_used_slot_count > 0
+                            ? (int64_t)((wg_high_water_mark_except_load + 
total_used_slot_count) *
+                                        query_ctx->get_slot_count() * 1.0 / 
total_used_slot_count)
+                            : wg_high_water_mark_except_load;
             if (!is_low_wartermark && !enable_hard_limit) {
                 query_weighted_mem_limit = wg_high_water_mark_except_load;
             } else {
-                query_weighted_mem_limit = total_used_slot_count > 0
-                                                   ? 
(int64_t)((wg_high_water_mark_except_load +
-                                                                
total_used_slot_count) *
-                                                               
query_ctx->get_slot_count() * 1.0 /
-                                                               
total_used_slot_count)
-                                                   : 
wg_high_water_mark_except_load;
+                query_weighted_mem_limit = expected_query_weighted_mem_limit;
             }
         }
         debug_msg += query_ctx->debug_string() + "\n";
@@ -610,6 +678,7 @@ void 
WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab
         // memory failed and we did not hanle it.
         if (!query_ctx->is_pure_load_task()) {
             query_ctx->set_mem_limit(query_weighted_mem_limit);
+            
query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit);
         }
     }
     //LOG(INFO) << debug_msg;
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 03f134006f5..d8b49fd827a 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -110,7 +110,7 @@ private:
                              Status paused_reason);
     void handle_non_overcommit_wg_paused_queries();
     void handle_overcommit_wg_paused_queries();
-    void change_query_to_hard_limit(WorkloadGroupPtr wg, bool 
enable_hard_limit);
+    void update_queries_limit(WorkloadGroupPtr wg, bool enable_hard_limit);
 
 private:
     std::shared_mutex _group_mutex;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to