This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit a8393eeba4163151030899eecf55ebc9365f02c1 Author: yiguolei <yiguo...@gmail.com> AuthorDate: Fri Sep 27 23:36:20 2024 +0800 cancel other query when memory not enough --- be/src/runtime/memory/memory_reclamation.cpp | 2 +- be/src/runtime/memory/thread_mem_tracker_mgr.h | 2 +- be/src/runtime/query_context.cpp | 2 +- be/src/runtime/query_context.h | 7 ++ .../workload_group/workload_group_manager.cpp | 115 ++++++++++++++++----- .../workload_group/workload_group_manager.h | 2 +- 6 files changed, 103 insertions(+), 27 deletions(-) diff --git a/be/src/runtime/memory/memory_reclamation.cpp b/be/src/runtime/memory/memory_reclamation.cpp index 17f5a41f462..4a4f73ee098 100644 --- a/be/src/runtime/memory/memory_reclamation.cpp +++ b/be/src/runtime/memory/memory_reclamation.cpp @@ -213,7 +213,7 @@ int64_t MemoryReclamation::tg_enable_overcommit_group_gc(int64_t request_free_me int64_t total_free_memory = 0; bool gc_all_exceeded = request_free_memory >= total_exceeded_memory; std::string log_prefix = fmt::format( - "work load group that enable overcommit, number of group: {}, request_free_memory:{}, " + "workload group that enable overcommit, number of group: {}, request_free_memory:{}, " "total_exceeded_memory:{}", task_groups.size(), request_free_memory, total_exceeded_memory); if (gc_all_exceeded) { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index c9f85258d5b..760fb992b3b 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -290,7 +290,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { // wg mgr will change wg's hard limit property. if (wg_ptr != nullptr && wg_ptr->enable_memory_overcommit() && !wg_ptr->has_changed_to_hard_limit()) { - // TODO: Only do a check here, do not real reserve. If we could reserve it, it is better, but the logic is too complicated. + // Only do a check here, do not real reserve. If we could reserve it, it is better, but the logic is too complicated. if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>( "reserve memory failed, size: {}, because {}", size, diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index af4cd3d417d..abc31d967bd 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -472,7 +472,7 @@ Status QueryContext::revoke_memory() { // Do not use memlimit, use current memory usage. // For example, if current limit is 1.6G, but current used is 1G, if reserve failed // should free 200MB memory, not 300MB - const auto target_revoking_size = query_mem_tracker->consumption() * 0.2; + const int64_t target_revoking_size = (int64_t)(query_mem_tracker->consumption() * 0.2); size_t revoked_size = 0; std::vector<pipeline::PipelineTask*> chosen_tasks; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 299e4ced55c..6ce09b7dd0e 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -247,6 +247,12 @@ public: int64_t get_mem_limit() const { return query_mem_tracker->limit(); } + void set_expected_mem_limit(int64_t new_mem_limit) { + _expected_mem_limit = std::min<int64_t>(new_mem_limit, _user_set_mem_limit); + } + + int64_t expected_mem_limit() { return _expected_mem_limit; } + std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return query_mem_tracker; } int32_t get_slot_count() const { @@ -411,6 +417,7 @@ private: std::atomic<bool> _low_memory_mode = false; int64_t _user_set_mem_limit = 0; + std::atomic<int64_t> _expected_mem_limit = 0; std::mutex _profile_mutex; timespec _query_arrival_timestamp; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 287a6b45729..95951b96bab 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -212,7 +212,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { weighted_memory_limit_ratio); LOG_EVERY_T(INFO, 60) << debug_msg; for (auto& wg : _workload_groups) { - change_query_to_hard_limit(wg.second, false); + update_queries_limit(wg.second, false); } } @@ -280,6 +280,7 @@ void WorkloadGroupMgr::handle_paused_queries() { void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { const int64_t TIMEOUT_IN_QUEUE = 1000L * 10; std::unique_lock<std::mutex> lock(_paused_queries_lock); + std::vector<std::weak_ptr<QueryContext>> resume_after_gc; for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { auto& queries_list = it->second; const auto& wg = it->first; @@ -310,9 +311,9 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { query_it = queries_list.erase(query_it); continue; } - + bool wg_changed_to_hard_limit = wg->has_changed_to_hard_limit(); // Only deal with non overcommit workload group. - if (wg->enable_memory_overcommit() && !wg->has_changed_to_hard_limit() && + if (wg->enable_memory_overcommit() && !wg_changed_to_hard_limit && !query_ctx->paused_reason().is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, // when reserve, the wg is hard limit, the query reserve failed, but when this loop run @@ -328,6 +329,9 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { } if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + CHECK(!wg->enable_memory_overcommit() || wg_changed_to_hard_limit); + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. bool spill_res = handle_single_query(query_ctx, query_it->reserve_size_, query_ctx->paused_reason()); if (!spill_res) { @@ -338,8 +342,23 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { continue; } } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + CHECK(!wg->enable_memory_overcommit() || wg_changed_to_hard_limit); + // check if the reserve is too large, if it is too large, + // should set the query's limit only. + // Check the query's reserve with expected limit. + if (query_ctx->expected_mem_limit() < + query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { + query_ctx->set_mem_limit(query_ctx->expected_mem_limit()); + query_ctx->set_memory_sufficient(true); + LOG(INFO) << "workload group memory reserve failed because " + << query_ctx->debug_string() << " reserve size " + << query_it->reserve_size_ << " is too large, set hard limit to " + << query_ctx->expected_mem_limit() << " and resume running."; + query_it = queries_list.erase(query_it); + continue; + } if (!has_changed_hard_limit) { - change_query_to_hard_limit(wg, true); + update_queries_limit(wg, true); has_changed_hard_limit = true; LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << " reserve memory failed due to workload group memory exceed, " @@ -412,15 +431,34 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { "to 0 now"; } if (query_it->cache_ratio_ < 0.001) { - // TODO: Find other exceed limit workload group and cancel query. - bool spill_res = handle_single_query(query_ctx, query_it->reserve_size_, - query_ctx->paused_reason()); - if (!spill_res) { - ++query_it; - continue; + if (query_it->any_wg_exceed_limit_) { + if (wg->enable_memory_overcommit()) { + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { + resume_after_gc.push_back(query_ctx); + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } else { + // current workload group is hard limit, should not wait other wg with + // soft limit, just cancel + resume_after_gc.push_back(query_ctx); + query_it = queries_list.erase(query_it); + continue; + } } else { - query_it = queries_list.erase(query_it); - continue; + // TODO: Find other exceed limit workload group and cancel query. + bool spill_res = handle_single_query(query_ctx, query_it->reserve_size_, + query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } } } if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < @@ -439,8 +477,22 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { // Finished deal with one workload group, and should deal with next one. ++it; } + // TODO minor GC to release some query + if (!resume_after_gc.empty()) { + } + for (auto resume_it = resume_after_gc.begin(); resume_it != resume_after_gc.end(); + ++resume_it) { + auto query_ctx = resume_it->lock(); + if (query_ctx != nullptr) { + query_ctx->set_memory_sufficient(true); + } + } } +// streamload, kafka routine load, group commit +// insert into select +// select + void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() { std::shared_lock<std::shared_mutex> r_lock(_group_mutex); // If there is only one workload group and it is overcommit, then do nothing. @@ -448,19 +500,24 @@ void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() { if (_workload_groups.size() == 1) { return; } - if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(100 * 1024 * 1024)) { + // soft_limit - 10%, will change workload group to hard limit. + // soft limit, process memory reserve failed. + // hard limit, FullGC will kill query randomly. + if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit( + (int64_t)(MemInfo::mem_limit() * 0.1))) { for (auto& [wg_id, wg] : _workload_groups) { if (wg->enable_memory_overcommit() && !wg->has_changed_to_hard_limit()) { wg->change_to_hard_limit(true); - LOG(INFO) << "Process memory usage will exceed soft limit, change all workload " + LOG(INFO) << "Process memory usage + 10% will exceed soft limit, change all " + "workload " "group with overcommit to hard limit now. " << wg->debug_string(); } } } - // If current memory usage is below soft memlimit - 10%, then enable wg's overcommit + // If current memory usage is below soft memlimit - 15%, then enable wg's overcommit if (!doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit( - (int64_t)(MemInfo::mem_limit() * 0.1))) { + (int64_t)(MemInfo::mem_limit() * 0.15))) { for (auto& [wg_id, wg] : _workload_groups) { if (wg->enable_memory_overcommit() && wg->has_changed_to_hard_limit()) { wg->change_to_hard_limit(false); @@ -471,6 +528,7 @@ void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() { } } } + // If the query could release some memory, for example, spill disk, flush memtable then the return value is true. // If the query could not release memory, then cancel the query, the return value is true. // If the query is not ready to do these tasks, it means just wait. @@ -488,6 +546,14 @@ bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c return false; } + // During waiting all task not running, may release some memory and the memory is enough now + // should resume the query. + if (query_ctx->get_mem_tracker()->limit() > + query_ctx->get_mem_tracker()->consumption() + size_to_reserve) { + query_ctx->set_memory_sufficient(true); + return true; + } + auto revocable_tasks = query_ctx->get_revocable_tasks(); if (revocable_tasks.empty()) { if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { @@ -518,7 +584,7 @@ bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c return true; } -void WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enable_hard_limit) { +void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_hard_limit) { auto wg_mem_limit = wg->memory_limit(); auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 1); wg->set_weighted_memory_limit(wg_weighted_mem_limit); @@ -578,6 +644,7 @@ void WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab continue; } int64_t query_weighted_mem_limit = 0; + int64_t expected_query_weighted_mem_limit = 0; // If the query enable hard limit, then it should not use the soft limit if (query_ctx->enable_query_slot_hard_limit()) { if (total_slot_count < 1) { @@ -589,20 +656,21 @@ void WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab query_weighted_mem_limit = (int64_t)((wg_high_water_mark_except_load * query_ctx->get_slot_count() * 1.0) / total_slot_count); + expected_query_weighted_mem_limit = query_weighted_mem_limit; } } else { // If low water mark is not reached, then use process memory limit as query memory limit. // It means it will not take effect. // If there are some query in paused list, then limit should take effect. + expected_query_weighted_mem_limit = + total_used_slot_count > 0 + ? (int64_t)((wg_high_water_mark_except_load + total_used_slot_count) * + query_ctx->get_slot_count() * 1.0 / total_used_slot_count) + : wg_high_water_mark_except_load; if (!is_low_wartermark && !enable_hard_limit) { query_weighted_mem_limit = wg_high_water_mark_except_load; } else { - query_weighted_mem_limit = total_used_slot_count > 0 - ? (int64_t)((wg_high_water_mark_except_load + - total_used_slot_count) * - query_ctx->get_slot_count() * 1.0 / - total_used_slot_count) - : wg_high_water_mark_except_load; + query_weighted_mem_limit = expected_query_weighted_mem_limit; } } debug_msg += query_ctx->debug_string() + "\n"; @@ -610,6 +678,7 @@ void WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab // memory failed and we did not hanle it. if (!query_ctx->is_pure_load_task()) { query_ctx->set_mem_limit(query_weighted_mem_limit); + query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit); } } //LOG(INFO) << debug_msg; diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 03f134006f5..d8b49fd827a 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -110,7 +110,7 @@ private: Status paused_reason); void handle_non_overcommit_wg_paused_queries(); void handle_overcommit_wg_paused_queries(); - void change_query_to_hard_limit(WorkloadGroupPtr wg, bool enable_hard_limit); + void update_queries_limit(WorkloadGroupPtr wg, bool enable_hard_limit); private: std::shared_mutex _group_mutex; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org