This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new a86d512321b Spill and reserve (#42611)
a86d512321b is described below
commit a86d512321b296522ae1f90c6c831b85415f4232
Author: yiguolei <[email protected]>
AuthorDate: Mon Oct 28 17:18:52 2024 +0800
Spill and reserve (#42611)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/olap/memtable_memory_limiter.cpp | 3 ++-
be/src/runtime/memory/mem_tracker_limiter.h | 2 +-
be/src/runtime/workload_group/workload_group.cpp | 10 +++++-----
be/src/runtime/workload_group/workload_group.h | 6 ++++--
be/src/runtime/workload_group/workload_group_manager.cpp | 12 +++++++++---
5 files changed, 21 insertions(+), 12 deletions(-)
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index fe483706127..75b2418372f 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -118,7 +118,8 @@ void
MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPt
// Should releae memory quickly.
using namespace std::chrono_literals;
int32_t sleep_times = 10;
- while (wg != nullptr && wg->enable_write_buffer_limit() && sleep_times >
0) {
+ while (wg != nullptr && wg->enable_write_buffer_limit() &&
wg->exceed_write_buffer_limit() &&
+ sleep_times > 0) {
std::this_thread::sleep_for(100ms);
--sleep_times;
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 1b03f2f2082..7e1a0e11c83 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -235,7 +235,7 @@ public:
static void make_top_consumption_tasks_tracker_profile(RuntimeProfile*
profile, int top_num);
static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
- int64_t load_buffer_size() const { return _write_tracker->consumption(); }
+ int64_t write_buffer_size() const { return _write_tracker->consumption(); }
std::shared_ptr<MemTrackerLimiter> write_tracker() { return
_write_tracker; }
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index c0895e8f0fd..badc55073ab 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -94,7 +94,7 @@ std::string WorkloadGroup::debug_string() const {
return fmt::format(
"WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
"total_query_slot_count={}, "
- "memory_limit = {}, write_buffer_ratio= {}%"
+ "memory_limit = {}, write_buffer_ratio= {}%, "
"enable_memory_overcommit = {}, total_mem_used = {},"
"wg_refresh_interval_memory_growth = {}, mem_used_ratio = {},
spill_low_watermark = "
"{}, spill_high_watermark = {},cpu_hard_limit = {},
scan_thread_num = "
@@ -185,7 +185,7 @@ void WorkloadGroup::check_and_update(const
WorkloadGroupInfo& tg_info) {
// MemtrackerLimiter is not removed during query context release, so that
should remove it here.
int64_t WorkloadGroup::refresh_memory_usage() {
int64_t used_memory = 0;
- int64_t load_buffer_size = 0;
+ int64_t write_buffer_size = 0;
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
for (auto trackerWptr = mem_tracker_group.trackers.begin();
@@ -195,14 +195,14 @@ int64_t WorkloadGroup::refresh_memory_usage() {
trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
} else {
used_memory += tracker->consumption();
- load_buffer_size += tracker->load_buffer_size();
+ write_buffer_size += tracker->write_buffer_size();
++trackerWptr;
}
}
}
// refresh total memory used.
- _total_mem_used = used_memory + load_buffer_size;
- _load_buffer_size = load_buffer_size;
+ _total_mem_used = used_memory + write_buffer_size;
+ _write_buffer_size = write_buffer_size;
// reserve memory is recorded in the query mem tracker
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset
_wg_refresh_interval_memory_growth.
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index ccdc6374ce8..ce95495b29a 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -80,12 +80,14 @@ public:
int64_t total_mem_used() const { return _total_mem_used; }
- int64_t write_buffer_size() const { return _load_buffer_size; }
+ int64_t write_buffer_size() const { return _write_buffer_size; }
void enable_write_buffer_limit(bool enable_limit) {
_enable_write_buffer_limit = enable_limit; }
bool enable_write_buffer_limit() const { return
_enable_write_buffer_limit; }
+ bool exceed_write_buffer_limit() const { return _write_buffer_size >
write_buffer_limit(); }
+
// make memory snapshots and refresh total memory used at the same time.
int64_t refresh_memory_usage();
int64_t memory_used();
@@ -213,7 +215,7 @@ private:
std::atomic<bool> _enable_write_buffer_limit = false;
std::atomic_int64_t _total_mem_used = 0; // bytes
- std::atomic_int64_t _load_buffer_size = 0;
+ std::atomic_int64_t _write_buffer_size = 0;
std::atomic_int64_t _wg_refresh_interval_memory_growth;
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index d7130ae26e4..91a1438d2fb 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -270,6 +270,15 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<QueryContext>& que
* strategy 5: If any query exceed process's memlimit and cache is zero, then
do following:
*/
void WorkloadGroupMgr::handle_paused_queries() {
+ {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ for (auto& [wg_id, wg] : _workload_groups) {
+ std::unique_lock<std::mutex> lock(_paused_queries_lock);
+ if (_paused_queries_list[wg].empty()) {
+ // Add an empty set to wg that not contains paused queries.
+ }
+ }
+ }
const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
std::unique_lock<std::mutex> lock(_paused_queries_lock);
bool has_revoked_from_other_group = false;
@@ -353,9 +362,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
wg->enable_write_buffer_limit(true);
++query_it;
continue;
- } else {
- // If could not revoke memory by flush memtable, then
disable load buffer limit
- wg->enable_write_buffer_limit(false);
}
if (!has_changed_hard_limit) {
update_queries_limit_(wg, true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]