This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 25eb49082ae wait for clear cache before do spill to disk
25eb49082ae is described below
commit 25eb49082ae7ca4646af13cc4890c8954e6baa6f
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 19 11:33:11 2024 +0800
wait for clear cache before do spill to disk
---
be/src/common/daemon.cpp | 5 ++-
be/src/runtime/memory/global_memory_arbitrator.cpp | 5 +++
be/src/runtime/memory/global_memory_arbitrator.h | 5 +++
.../workload_group/workload_group_manager.cpp | 45 ++++++++++++++++++----
4 files changed, 52 insertions(+), 8 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index c3f8d89de82..3bd0de7ebb6 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -487,7 +487,9 @@ void Daemon::cache_adjust_capacity_thread() {
doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
l, std::chrono::seconds(1));
}
- double adjust_weighted =
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted;
+ double adjust_weighted = std::min<double>(
+ GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted,
+
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted);
if (_stop_background_threads_latch.count() == 0) {
break;
}
@@ -502,6 +504,7 @@ void Daemon::cache_adjust_capacity_thread() {
LOG(INFO) << fmt::format(
"[MemoryGC] refresh cache capacity end, free memory {},
details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
+ GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted =
adjust_weighted;
doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store(
false, std::memory_order_relaxed);
} while (true);
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp
b/be/src/runtime/memory/global_memory_arbitrator.cpp
index 45d7781786f..0c774187ff3 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -38,7 +38,12 @@ std::atomic<int64_t>
GlobalMemoryArbitrator::refresh_interval_memory_growth = 0;
std::mutex GlobalMemoryArbitrator::cache_adjust_capacity_lock;
std::condition_variable GlobalMemoryArbitrator::cache_adjust_capacity_cv;
std::atomic<bool> GlobalMemoryArbitrator::cache_adjust_capacity_notify {false};
+// This capacity is set by gc thread, it is running periodicity.
std::atomic<double>
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted {1};
+// This capacity is set by workload group spill disk thread
+std::atomic<double>
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1};
+// The value that take affect
+std::atomic<double>
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify
{false};
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h
b/be/src/runtime/memory/global_memory_arbitrator.h
index 1859f45391f..468d442b662 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -178,6 +178,11 @@ public:
static std::condition_variable cache_adjust_capacity_cv;
static std::atomic<bool> cache_adjust_capacity_notify;
static std::atomic<double> last_cache_capacity_adjust_weighted;
+ // This capacity is set by workload group spill disk thread
+ static std::atomic<double> last_wg_trigger_cache_capacity_adjust_weighted;
+ // The value that take affect
+ static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
+
static void notify_cache_adjust_capacity() {
cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);
cache_adjust_capacity_cv.notify_all();
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index c17b55f8956..6fe8c7a51eb 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -233,6 +233,10 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
continue;
}
+ // If the wg enable over commit memory, then it is no need to update
query memlimit
+ if (wg.second->enable_memory_overcommit()) {
+ continue;
+ }
int32_t total_used_slot_count = 0;
int32_t total_slot_count = wg.second->total_query_slot_count();
// calculate total used slot count
@@ -335,6 +339,11 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<QueryContext>& que
void WorkloadGroupMgr::handle_paused_queries() {
std::unique_lock<std::mutex> lock(_paused_queries_lock);
if (_paused_queries_list.empty()) {
+ if
(doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted
!= 1) {
+
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
1;
+ doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+ LOG(INFO) << "There are no queries in paused list, so that set
cache capacity to 1 now";
+ }
return;
}
@@ -371,13 +380,10 @@ void WorkloadGroupMgr::handle_paused_queries() {
++it;
}
- std::shared_ptr<QueryContext> max_revocable_query;
- std::shared_ptr<QueryContext> max_memory_usage_query;
- std::shared_ptr<QueryContext> running_query;
- bool has_running_query = false;
- size_t max_revocable_size = 0;
- size_t max_memory_usage = 0;
- auto it_to_remove = queries_list.end();
+ // If the wg's query list is empty, then should do nothing
+ if (queries_list.empty()) {
+ continue;
+ }
// TODO: should check buffer type memory first, if could release many
these memory, then not need do spill disk
// Buffer Memory are:
@@ -387,6 +393,31 @@ void WorkloadGroupMgr::handle_paused_queries() {
// 4. streaming aggs.
// If we could not recycle memory from these buffers(< 10%), then do
spill disk.
+ // 1. Check cache used, if cache is larger than > 0, then just return
and wait for it to 0 to release some memory.
+ if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
0 &&
+
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
0) {
+
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
0;
+ doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+ LOG(INFO) << "There are some queries need memory, so that set
cache capacity to 0 now";
+ // If there is cache, then return, only check to do spill disk
when cache is larger than 0.
+ return;
+ }
+
+ // 2. If memtable size larger than 10% of wg's limit, then flush
memtable and wait.
+
+ // If the wg enable memory overcommit, then not spill, just cancel
query.
+ if (wg->enable_memory_overcommit()) {
+ continue;
+ }
+
+ std::shared_ptr<QueryContext> max_revocable_query;
+ std::shared_ptr<QueryContext> max_memory_usage_query;
+ std::shared_ptr<QueryContext> running_query;
+ bool has_running_query = false;
+ size_t max_revocable_size = 0;
+ size_t max_memory_usage = 0;
+ auto it_to_remove = queries_list.end();
+
for (auto query_it = queries_list.begin(); query_it !=
queries_list.end();) {
const auto query_ctx = query_it->query_ctx_.lock();
// The query is finished during in paused list.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]