This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 1728f5931b5 [branch-2.0](memory) Fix work load group GC and add logs
to locate slow GC (#25368)
1728f5931b5 is described below
commit 1728f5931b55391c431a36839267e0e676fa976e
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Oct 13 09:23:55 2023 +0800
[branch-2.0](memory) Fix work load group GC and add logs to locate slow GC
(#25368)
* [fix](memory) Fix work load group GC and add logs to locate slow GC #24975
Fix work load group GC, add cancel load and add logs.
Unify the format and change all to lowercase of GC logs, avoid unnecessary
trouble when grep or less
Add logs to help locate the cause of slow GC.
* fix fonflict
---
be/src/common/config.cpp | 2 +-
be/src/common/daemon.cpp | 6 +-
be/src/runtime/fragment_mgr.h | 5 +
be/src/runtime/memory/mem_tracker_limiter.cpp | 202 +++++++++++++++++---------
be/src/runtime/memory/mem_tracker_limiter.h | 27 +++-
be/src/runtime/runtime_state.cpp | 6 +-
be/src/util/mem_info.cpp | 94 ++++++++----
be/src/util/mem_info.h | 5 +-
be/src/vec/common/allocator.cpp | 16 +-
9 files changed, 243 insertions(+), 120 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ec96cdc4f97..f1ecd0e7825 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -132,7 +132,7 @@ DEFINE_mBool(disable_memory_gc, "false");
DEFINE_mInt64(large_memory_check_bytes, "2147483648");
-// The maximum time a thread waits for a full GC. Currently only query will
wait for full gc.
+// The maximum time a thread waits for full GC. Currently only query will wait
for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
DEFINE_mInt64(pre_serialize_keys_limit_bytes, "16777216");
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 7fb4ba2289c..c72f7d66ed8 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -237,7 +237,7 @@ void Daemon::memory_gc_thread() {
auto proc_mem_no_allocator_cache =
doris::MemInfo::proc_mem_no_allocator_cache();
// GC excess memory for resource groups that not enable overcommit
- auto tg_free_mem = doris::MemInfo::tg_hard_memory_limit_gc();
+ auto tg_free_mem = doris::MemInfo::tg_not_enable_overcommit_group_gc();
sys_mem_available += tg_free_mem;
proc_mem_no_allocator_cache -= tg_free_mem;
@@ -247,7 +247,7 @@ void Daemon::memory_gc_thread() {
// No longer full gc and minor gc during sleep.
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
- LOG(INFO) << fmt::format("Start Full GC, {}.",
+ LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.",
MemTrackerLimiter::process_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_full_gc()) {
@@ -259,7 +259,7 @@ void Daemon::memory_gc_thread() {
proc_mem_no_allocator_cache >=
doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
- LOG(INFO) << fmt::format("Start Minor GC, {}.",
+ LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.",
MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_minor_gc()) {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index dca60b91e74..5f167aa2dc0 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -139,6 +139,11 @@ public:
void coordinator_callback(const ReportStatusRequest& req);
+ int32_t running_query_num() {
+ std::unique_lock<std::mutex> ctx_lock(_lock);
+ return _query_ctx_map.size();
+ }
+
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const
FinishCallback& cb);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 475c203fd41..e4b795d8366 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -31,6 +31,7 @@
#include "runtime/fragment_mgr.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/task_group/task_group.h"
+#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
@@ -40,6 +41,7 @@
namespace doris {
bvar::Adder<int64_t> g_memtrackerlimiter_cnt("memtrackerlimiter_cnt");
+constexpr auto GC_MAX_SEEK_TRACKER = 1000;
// Save all MemTrackerLimiters in use.
// Each group corresponds to several MemTrackerLimiters and has a lock.
@@ -301,7 +303,7 @@ bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t
bytes) {
std::string MemTrackerLimiter::process_mem_log_str() {
return fmt::format(
- "OS physical memory {}. Process memory usage {}, limit {}, soft
limit {}. Sys "
+ "os physical memory {}. process memory used {}, limit {}, soft
limit {}. sys "
"available memory {}, low water mark {}, warning water mark {}.
Refresh interval "
"memory growth {} B",
PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
@@ -314,7 +316,7 @@ std::string MemTrackerLimiter::process_mem_log_str() {
std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str() {
return fmt::format(
- "process memory used {} exceed limit {} or sys mem available {}
less than low "
+ "process memory used {} exceed limit {} or sys available memory {}
less than low "
"water mark {}",
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
MemInfo::sys_mem_available_str(),
@@ -323,36 +325,31 @@ std::string
MemTrackerLimiter::process_limit_exceeded_errmsg_str() {
std::string MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str() {
return fmt::format(
- "process memory used {} exceed soft limit {} or sys mem available
{} less than warning "
- "water mark {}.",
+ "process memory used {} exceed soft limit {} or sys available
memory {} less than "
+ "warning water mark {}.",
PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(),
MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES));
}
-std::string MemTrackerLimiter::query_tracker_limit_exceeded_str(
- const std::string& tracker_limit_exceeded, const std::string&
last_consumer_tracker,
- const std::string& executing_msg) {
- return fmt::format(
- "Memory limit exceeded:{}, exec node:<{}>, execute msg:{}. backend
{} "
- "process memory used {}, limit {}. Can `set "
- "exec_mem_limit=8G` to change limit, details see be.INFO.",
- tracker_limit_exceeded, last_consumer_tracker, executing_msg,
- BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
- MemInfo::mem_limit_str());
-}
-
std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
- return fmt::format(
- "exceeded tracker:<{}>, limit {}, peak "
- "used {}, current used {}",
- label(), print_bytes(limit()),
print_bytes(_consumption->peak_value()),
- print_bytes(_consumption->current_value()));
-}
-
-std::string MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) {
- return fmt::format("failed alloc size {}, {}", print_bytes(bytes),
- tracker_limit_exceeded_str());
+ std::string err_msg = fmt::format(
+ "memory tracker limit exceeded, tracker label:{}, type:{}, limit "
+ "{}, peak used {}, current used {}. backend {} process memory used
{}.",
+ label(), type_string(_type), print_bytes(limit()),
+ print_bytes(_consumption->peak_value()),
print_bytes(_consumption->current_value()),
+ BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str());
+ if (_type == Type::QUERY || _type == Type::LOAD) {
+ err_msg += fmt::format(
+ " exec node:<{}>, can `set exec_mem_limit=8G` to change limit,
details see "
+ "be.INFO.",
+
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker());
+ } else if (_type == Type::SCHEMA_CHANGE) {
+ err_msg += fmt::format(
+ " can modify
`memory_limitation_per_thread_for_schema_change_bytes` in be.conf to "
+ "change limit, details see be.INFO.");
+ }
+ return err_msg;
}
int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
@@ -364,9 +361,9 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t
min_free_mem,
[&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
const std::string& label)
{
return fmt::format(
- "Process has no memory available, cancel top memory
usage {}: "
+ "Process has no memory available, cancel top memory
used {}: "
"{} memory tracker <{}> consumption {}, backend {} "
- "process memory used {} exceed limit {} or sys mem
available {} "
+ "process memory used {} exceed limit {} or sys
available memory {} "
"less than low water mark {}. Execute again after
enough memory, "
"details see be.INFO.",
type_string(type), type_string(type), label,
print_bytes(mem_consumption),
@@ -374,14 +371,14 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t
min_free_mem,
mem_available_str,
print_bytes(MemInfo::sys_mem_available_low_water_mark()));
},
- profile);
+ profile, GCType::PROCESS);
}
template <typename TrackerGroups>
int64_t MemTrackerLimiter::free_top_memory_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>&
tracker_groups,
const std::function<std::string(int64_t, const std::string&)>&
cancel_msg,
- RuntimeProfile* profile) {
+ RuntimeProfile* profile, GCType GCtype) {
using MemTrackerMinQueue = std::priority_queue<std::pair<int64_t,
std::string>,
std::vector<std::pair<int64_t, std::string>>,
std::greater<std::pair<int64_t, std::string>>>;
@@ -397,9 +394,18 @@ int64_t MemTrackerLimiter::free_top_memory_query(
COUNTER_SET(seek_tasks_counter, (int64_t)0);
COUNTER_SET(previously_canceling_tasks_counter, (int64_t)0);
+ std::string log_prefix = fmt::format("[MemoryGC] GC free {} top memory
used {}, ",
+ gc_type_string(GCtype),
type_string(type));
+ LOG(INFO) << fmt::format("{}, start seek all {}, running query and load
num: {}", log_prefix,
+ type_string(type),
+
ExecEnv::GetInstance()->fragment_mgr()->running_query_num());
+
{
SCOPED_TIMER(find_cost_time);
for (unsigned i = 1; i < tracker_groups.size(); ++i) {
+ if (seek_num > GC_MAX_SEEK_TRACKER) {
+ break;
+ }
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
for (auto tracker : tracker_groups[i].trackers) {
if (tracker->type() == type) {
@@ -436,9 +442,10 @@ int64_t MemTrackerLimiter::free_top_memory_query(
COUNTER_UPDATE(seek_tasks_counter, seek_num);
COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size());
- LOG(INFO) << "GC Free Top Memory Usage " << type_string(type) << " seek
finished, seek "
- << seek_num << " tasks. among them, " << min_pq.size() << "
tasks will be canceled, "
- << prepare_free_mem << " memory size prepare free; " <<
canceling_task.size()
+
+ LOG(INFO) << log_prefix << "seek finished, seek " << seek_num << " tasks.
among them, "
+ << min_pq.size() << " tasks will be canceled, " <<
prepare_free_mem
+ << " memory size prepare free; " << canceling_task.size()
<< " tasks is being canceled and has not been completed yet;"
<< (canceling_task.size() > 0 ? " consist of: " +
join(canceling_task, ",") : "");
@@ -448,8 +455,8 @@ int64_t MemTrackerLimiter::free_top_memory_query(
while (!min_pq.empty()) {
TUniqueId cancelled_queryid =
label_to_queryid(min_pq.top().second);
if (cancelled_queryid == TUniqueId()) {
- LOG(WARNING) << "GC Free Top Memory Usage " <<
type_string(type)
- << ", Task ID parsing failed, label: " <<
min_pq.top().second;
+ LOG(WARNING) << log_prefix
+ << "Task ID parsing failed, label: " <<
min_pq.top().second;
min_pq.pop();
continue;
}
@@ -459,15 +466,14 @@ int64_t MemTrackerLimiter::free_top_memory_query(
COUNTER_UPDATE(freed_memory_counter, min_pq.top().first);
COUNTER_UPDATE(cancel_tasks_counter, 1);
- usage_strings.push_back(fmt::format("{} memory usage {} Bytes",
min_pq.top().second,
+ usage_strings.push_back(fmt::format("{} memory used {} Bytes",
min_pq.top().second,
min_pq.top().first));
min_pq.pop();
}
}
profile->merge(free_top_memory_task_profile.get());
- LOG(INFO) << "GC Free Top Memory Usage " << type_string(type) << " cancel
finished, "
- << cancel_tasks_counter->value()
+ LOG(INFO) << log_prefix << "cancel finished, " <<
cancel_tasks_counter->value()
<< " tasks canceled, memory size being freed: " <<
freed_memory_counter->value()
<< ", consist of: " << join(usage_strings, ",");
return freed_memory_counter->value();
@@ -484,7 +490,7 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
return fmt::format(
"Process has less memory, cancel top memory overcommit
{}: "
"{} memory tracker <{}> consumption {}, backend {} "
- "process memory used {} exceed soft limit {} or sys
mem available {} "
+ "process memory used {} exceed soft limit {} or sys
available memory {} "
"less than warning water mark {}. Execute again after
enough memory, "
"details see be.INFO.",
type_string(type), type_string(type), label,
print_bytes(mem_consumption),
@@ -492,14 +498,14 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
mem_available_str,
print_bytes(MemInfo::sys_mem_available_warning_water_mark()));
},
- profile);
+ profile, GCType::PROCESS);
}
template <typename TrackerGroups>
int64_t MemTrackerLimiter::free_top_overcommit_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>&
tracker_groups,
const std::function<std::string(int64_t, const std::string&)>&
cancel_msg,
- RuntimeProfile* profile) {
+ RuntimeProfile* profile, GCType GCtype) {
std::priority_queue<std::pair<int64_t, std::string>> max_pq;
std::unordered_map<std::string, int64_t> query_consumption;
std::vector<std::string> canceling_task;
@@ -512,9 +518,18 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
COUNTER_SET(seek_tasks_counter, (int64_t)0);
COUNTER_SET(previously_canceling_tasks_counter, (int64_t)0);
+ std::string log_prefix = fmt::format("[MemoryGC] GC free {} top memory
overcommit {}, ",
+ gc_type_string(GCtype),
type_string(type));
+ LOG(INFO) << fmt::format("{}, start seek all {}, running query and load
num: {}", log_prefix,
+ type_string(type),
+
ExecEnv::GetInstance()->fragment_mgr()->running_query_num());
+
{
SCOPED_TIMER(find_cost_time);
for (unsigned i = 1; i < tracker_groups.size(); ++i) {
+ if (seek_num > GC_MAX_SEEK_TRACKER) {
+ break;
+ }
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
for (auto tracker : tracker_groups[i].trackers) {
if (tracker->type() == type) {
@@ -542,22 +557,21 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
COUNTER_UPDATE(seek_tasks_counter, seek_num);
COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size());
- LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) << "
seek finished, seek "
- << seek_num << " tasks. among them, " << query_consumption.size()
- << " tasks can be canceled; " << small_num << " small tasks that
were skipped; "
- << canceling_task.size() << " tasks is being canceled and has
not been completed yet;"
+
+ LOG(INFO) << log_prefix << "seek finished, seek " << seek_num << " tasks.
among them, "
+ << query_consumption.size() << " tasks can be canceled; " <<
small_num
+ << " small tasks that were skipped; " << canceling_task.size()
+ << " tasks is being canceled and has not been completed yet;"
<< (canceling_task.size() > 0 ? " consist of: " +
join(canceling_task, ",") : "");
// Minor gc does not cancel when there is only one query.
if (query_consumption.size() == 0) {
- LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type)
- << " finished, no task need be canceled.";
+ LOG(INFO) << log_prefix << "finished, no task need be canceled.";
return 0;
}
if (query_consumption.size() == 1) {
auto iter = query_consumption.begin();
- LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type)
- << " finished, only one task: " << iter->first
+ LOG(INFO) << log_prefix << "finished, only one task: " << iter->first
<< ", memory consumption: " << iter->second << ", no
cancel.";
return 0;
}
@@ -568,8 +582,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
while (!max_pq.empty()) {
TUniqueId cancelled_queryid =
label_to_queryid(max_pq.top().second);
if (cancelled_queryid == TUniqueId()) {
- LOG(WARNING) << "GC Free Top Memory Overcommit " <<
type_string(type)
- << ", Task ID parsing failed, label: " <<
max_pq.top().second;
+ LOG(WARNING) << log_prefix
+ << "Task ID parsing failed, label: " <<
max_pq.top().second;
max_pq.pop();
continue;
}
@@ -578,7 +592,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
cancelled_queryid,
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
cancel_msg(query_mem, max_pq.top().second));
- usage_strings.push_back(fmt::format("{} memory usage {} Bytes,
overcommit ratio: {}",
+ usage_strings.push_back(fmt::format("{} memory used {} Bytes,
overcommit ratio: {}",
max_pq.top().second, query_mem,
max_pq.top().first));
COUNTER_UPDATE(freed_memory_counter, query_mem);
@@ -591,8 +605,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
}
profile->merge(free_top_memory_task_profile.get());
- LOG(INFO) << "GC Free Top Memory Overcommit " << type_string(type) << "
cancel finished, "
- << cancel_tasks_counter->value()
+ LOG(INFO) << log_prefix << "cancel finished, " <<
cancel_tasks_counter->value()
<< " tasks canceled, memory size being freed: " <<
freed_memory_counter->value()
<< ", consist of: " << join(usage_strings, ",");
return freed_memory_counter->value();
@@ -607,28 +620,77 @@ int64_t MemTrackerLimiter::tg_memory_limit_gc(
}
int64_t freed_mem = 0;
- constexpr auto query_type = MemTrackerLimiter::Type::QUERY;
- auto cancel_str = [id, &name, memory_limit, used_memory](int64_t
mem_consumption,
- const
std::string& label) {
+
+ std::string cancel_str = fmt::format(
+ "work load group memory exceeded limit, group id:{}, name:{},
used:{}, limit:{}, "
+ "backend:{}.",
+ id, name, MemTracker::print_bytes(used_memory),
MemTracker::print_bytes(memory_limit),
+ BackendOptions::get_localhost());
+ auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
+ const std::string& label) {
+ return fmt::format(
+ "{} cancel top memory overcommit tracker <{}> consumption {}.
execute again after "
+ "enough memory, details see be.INFO.",
+ cancel_str, label, MemTracker::print_bytes(mem_consumption));
+ };
+ auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const
std::string& label) {
return fmt::format(
- "Resource group id:{}, name:{} memory exceeded limit, cancel
top memory {}: "
- "memory tracker <{}> consumption {}, backend {}, "
- "resource group memory used {}, memory limit {}.",
- id, name, MemTrackerLimiter::type_string(query_type), label,
- MemTracker::print_bytes(mem_consumption),
BackendOptions::get_localhost(),
- MemTracker::print_bytes(used_memory),
MemTracker::print_bytes(memory_limit));
+ "{} cancel top memory used tracker <{}> consumption {}.
execute again after "
+ "enough memory, details see be.INFO.",
+ cancel_str, label, MemTracker::print_bytes(mem_consumption));
};
+
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] work load group start gc, id:{} name:{}, memory limit:
{}, used: {}, "
+ "need_free_mem: {}.",
+ id, name, memory_limit, used_memory, need_free_mem);
+ Defer defer {[&]() {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] work load group finished gc, id:{} name:{}, memory
limit: {}, used: "
+ "{}, need_free_mem: {}, freed memory: {}.",
+ id, name, memory_limit, used_memory, need_free_mem, freed_mem);
+ }};
+
+ // 1. free top overcommit query
if (config::enable_query_memory_overcommit) {
+ RuntimeProfile* tmq_profile = profile->create_child(
+ fmt::format("FreeGroupTopOvercommitQuery:Name {}", name),
true, true);
freed_mem += MemTrackerLimiter::free_top_overcommit_query(
- need_free_mem - freed_mem, query_type, tracker_limiter_groups,
cancel_str, profile);
+ need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY,
tracker_limiter_groups,
+ cancel_top_overcommit_str, tmq_profile,
GCType::WORK_LOAD_GROUP);
}
- if (freed_mem < need_free_mem) {
- freed_mem += MemTrackerLimiter::free_top_memory_query(
- need_free_mem - freed_mem, query_type, tracker_limiter_groups,
cancel_str, profile);
+ if (freed_mem >= need_free_mem) {
+ return freed_mem;
}
- LOG(INFO) << fmt::format(
- "task group {} finished gc, memory_limit: {}, used_memory: {},
freed_mem: {}.", name,
- memory_limit, used_memory, freed_mem);
+
+ // 2. free top usage query
+ RuntimeProfile* tmq_profile =
+ profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name
{}", name), true, true);
+ freed_mem += MemTrackerLimiter::free_top_memory_query(
+ need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY,
tracker_limiter_groups,
+ cancel_top_usage_str, tmq_profile, GCType::WORK_LOAD_GROUP);
+ if (freed_mem >= need_free_mem) {
+ return freed_mem;
+ }
+
+ // 3. free top overcommit load
+ if (config::enable_query_memory_overcommit) {
+ tmq_profile =
profile->create_child(fmt::format("FreeGroupTopOvercommitLoad:Name {}", name),
+ true, true);
+ freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+ need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD,
tracker_limiter_groups,
+ cancel_top_overcommit_str, tmq_profile,
GCType::WORK_LOAD_GROUP);
+ if (freed_mem >= need_free_mem) {
+ return freed_mem;
+ }
+ }
+
+ // 4. free top usage load
+ tmq_profile =
+ profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}",
name), true, true);
+ freed_mem += MemTrackerLimiter::free_top_memory_query(
+ need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD,
tracker_limiter_groups,
+ cancel_top_usage_str, tmq_profile, GCType::WORK_LOAD_GROUP);
return freed_mem;
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 41601494079..936faa15921 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -70,6 +70,9 @@ public:
6 // Experimental memory statistics, usually inaccurate, used
for debugging, and expect to add other types in the future.
};
+ // TODO There are more and more GC codes and there should be a separate
manager class.
+ enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 };
+
struct TrackerLimiterGroup {
std::list<MemTrackerLimiter*> trackers;
std::mutex group_lock;
@@ -113,6 +116,19 @@ public:
__builtin_unreachable();
}
+ static std::string gc_type_string(GCType type) {
+ switch (type) {
+ case GCType::PROCESS:
+ return "process";
+ case GCType::WORK_LOAD_GROUP:
+ return "work load group";
+ default:
+ LOG(FATAL) << "not match gc type:" << static_cast<int>(type);
+ }
+ LOG(FATAL) << "__builtin_unreachable";
+ __builtin_unreachable();
+ }
+
static bool sys_mem_exceed_limit_check(int64_t bytes);
void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption
not supported"; }
@@ -173,7 +189,7 @@ public:
static int64_t free_top_memory_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>&
tracker_groups,
const std::function<std::string(int64_t, const std::string&)>&
cancel_msg,
- RuntimeProfile* profile);
+ RuntimeProfile* profile, GCType GCtype);
static int64_t free_top_memory_load(int64_t min_free_mem, const
std::string& vm_rss_str,
const std::string& mem_available_str,
@@ -191,7 +207,7 @@ public:
static int64_t free_top_overcommit_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>&
tracker_groups,
const std::function<std::string(int64_t, const std::string&)>&
cancel_msg,
- RuntimeProfile* profile);
+ RuntimeProfile* profile, GCType GCtype);
static int64_t free_top_overcommit_load(int64_t min_free_mem, const
std::string& vm_rss_str,
const std::string&
mem_available_str,
@@ -221,11 +237,7 @@ public:
static std::string process_limit_exceeded_errmsg_str();
static std::string process_soft_limit_exceeded_errmsg_str();
// Log the memory usage when memory limit is exceeded.
- std::string query_tracker_limit_exceeded_str(const std::string&
tracker_limit_exceeded,
- const std::string&
last_consumer_tracker,
- const std::string&
executing_msg);
std::string tracker_limit_exceeded_str();
- std::string tracker_limit_exceeded_str(int64_t bytes);
std::string debug_string() override {
std::stringstream msg;
@@ -289,7 +301,8 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes)
{
return Status::OK();
}
if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
- return Status::MemoryLimitExceeded(tracker_limit_exceeded_str(bytes));
+ return Status::MemoryLimitExceeded(fmt::format(
+ "failed alloc size {}, {}", print_bytes(bytes),
tracker_limit_exceeded_str()));
}
return Status::OK();
}
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 0a6ebca77da..631e0490a71 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -293,9 +293,9 @@ Status RuntimeState::check_query_state(const std::string&
msg) {
// Usually used after SCOPED_ATTACH_TASK, during query execution.
if (thread_context()->thread_mem_tracker()->limit_exceeded() &&
!config::enable_query_memory_overcommit) {
- auto failed_msg =
thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str(
-
thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str(),
-
thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), msg);
+ auto failed_msg =
+ fmt::format("{}, {}", msg,
+
thread_context()->thread_mem_tracker()->tracker_limit_exceeded_str());
thread_context()->thread_mem_tracker()->print_log_usage(failed_msg);
log_error(failed_msg);
return Status::MemoryLimitExceeded(failed_msg);
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 2686c56d388..9250101e67d 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -125,9 +125,10 @@ bool MemInfo::process_minor_gc() {
je_purge_all_arena_dirty_pages();
std::stringstream ss;
profile->pretty_print(&ss);
- LOG(INFO) << fmt::format("End Minor GC, Free Memory {}. cost(us): {},
details: {}",
- PrettyPrinter::print(freed_mem, TUnit::BYTES),
- watch.elapsed_time() / 1000, ss.str());
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] end minor GC, free memory {}. cost(us): {},
details: {}",
+ PrettyPrinter::print(freed_mem, TUnit::BYTES),
watch.elapsed_time() / 1000,
+ ss.str());
}};
freed_mem +=
CacheManager::instance()->for_each_cache_prune_stale(profile.get());
@@ -137,14 +138,14 @@ bool MemInfo::process_minor_gc() {
}
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true,
true);
- freed_mem += tg_soft_memory_limit_gc(_s_process_minor_gc_size - freed_mem,
tg_profile);
+ freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size -
freed_mem, tg_profile);
if (freed_mem > _s_process_minor_gc_size) {
return true;
}
if (config::enable_query_memory_overcommit) {
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "Before free top memory overcommit query in Minor GC",
+ "[MemoryGC] before free top memory overcommit query in minor
GC",
MemTrackerLimiter::Type::QUERY);
RuntimeProfile* toq_profile =
profile->create_child("FreeTopOvercommitMemoryQuery", true,
true);
@@ -175,9 +176,10 @@ bool MemInfo::process_full_gc() {
je_purge_all_arena_dirty_pages();
std::stringstream ss;
profile->pretty_print(&ss);
- LOG(INFO) << fmt::format("End Full GC, Free Memory {}. cost(us): {},
details: {}",
- PrettyPrinter::print(freed_mem, TUnit::BYTES),
- watch.elapsed_time() / 1000, ss.str());
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] end full GC, free Memory {}. cost(us): {},
details: {}",
+ PrettyPrinter::print(freed_mem, TUnit::BYTES),
watch.elapsed_time() / 1000,
+ ss.str());
}};
freed_mem +=
CacheManager::instance()->for_each_cache_prune_all(profile.get());
@@ -187,13 +189,13 @@ bool MemInfo::process_full_gc() {
}
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true,
true);
- freed_mem += tg_soft_memory_limit_gc(_s_process_full_gc_size - freed_mem,
tg_profile);
+ freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size -
freed_mem, tg_profile);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top
memory query in Full GC",
-
MemTrackerLimiter::Type::QUERY);
+ VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
+ "[MemoryGC] before free top memory query in full GC",
MemTrackerLimiter::Type::QUERY);
RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery",
true, true);
freed_mem += MemTrackerLimiter::free_top_memory_query(
_s_process_full_gc_size - freed_mem, pre_vm_rss,
pre_sys_mem_available, tmq_profile);
@@ -203,7 +205,8 @@ bool MemInfo::process_full_gc() {
if (config::enable_query_memory_overcommit) {
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "Before free top memory overcommit load in Full GC",
MemTrackerLimiter::Type::LOAD);
+ "[MemoryGC] before free top memory overcommit load in full GC",
+ MemTrackerLimiter::Type::LOAD);
RuntimeProfile* tol_profile =
profile->create_child("FreeTopMemoryOvercommitLoad", true,
true);
freed_mem += MemTrackerLimiter::free_top_overcommit_load(
@@ -214,8 +217,8 @@ bool MemInfo::process_full_gc() {
}
}
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top
memory load in Full GC",
-
MemTrackerLimiter::Type::LOAD);
+ VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
+ "[MemoryGC] before free top memory load in full GC",
MemTrackerLimiter::Type::LOAD);
RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad",
true, true);
freed_mem += MemTrackerLimiter::free_top_memory_load(
_s_process_full_gc_size - freed_mem, pre_vm_rss,
pre_sys_mem_available, tml_profile);
@@ -225,31 +228,39 @@ bool MemInfo::process_full_gc() {
return false;
}
-int64_t MemInfo::tg_hard_memory_limit_gc() {
+int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
MonotonicStopWatch watch;
watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
std::unique_ptr<RuntimeProfile> tg_profile =
std::make_unique<RuntimeProfile>("WorkloadGroup");
int64_t total_free_memory = 0;
+ taskgroup::TaskGroupManager::instance()->get_resource_groups(
+ [](const taskgroup::TaskGroupPtr& task_group) {
+ return !task_group->enable_memory_overcommit();
+ },
+ &task_groups);
+ if (task_groups.empty()) {
+ return 0;
+ }
+
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] start GC work load group that not enable overcommit,
number of group: {}, "
+ "if it exceeds the limit, try free size = (group used - group
limit).",
+ task_groups.size());
+
Defer defer {[&]() {
if (total_free_memory > 0) {
std::stringstream ss;
tg_profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
- "End Task Group Overcommit Memory GC, Free Memory {}.
cost(us): {}, "
- "details: {}",
- PrettyPrinter::print(total_free_memory, TUnit::BYTES),
+ "[MemoryGC] end GC work load group that not enable
overcommit, number of "
+ "group: {}, free memory {}. cost(us): {}, details: {}",
+ task_groups.size(),
PrettyPrinter::print(total_free_memory, TUnit::BYTES),
watch.elapsed_time() / 1000, ss.str());
}
}};
- taskgroup::TaskGroupManager::instance()->get_resource_groups(
- [](const taskgroup::TaskGroupPtr& task_group) {
- return !task_group->enable_memory_overcommit();
- },
- &task_groups);
-
for (const auto& task_group : task_groups) {
taskgroup::TaskGroupInfo tg_info;
task_group->task_group_info(&tg_info);
@@ -261,13 +272,19 @@ int64_t MemInfo::tg_hard_memory_limit_gc() {
return total_free_memory;
}
-int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory,
RuntimeProfile* profile) {
+int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
+ RuntimeProfile* profile) {
+ MonotonicStopWatch watch;
+ watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
taskgroup::TaskGroupManager::instance()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->enable_memory_overcommit();
},
&task_groups);
+ if (task_groups.empty()) {
+ return 0;
+ }
int64_t total_exceeded_memory = 0;
std::vector<int64_t> used_memorys;
@@ -283,6 +300,33 @@ int64_t MemInfo::tg_soft_memory_limit_gc(int64_t
request_free_memory, RuntimePro
int64_t total_free_memory = 0;
bool gc_all_exceeded = request_free_memory >= total_exceeded_memory;
+ std::string log_prefix = fmt::format(
+ "work load group that enable overcommit, number of group: {},
request_free_memory:{}, "
+ "total_exceeded_memory:{}",
+ task_groups.size(), request_free_memory, total_exceeded_memory);
+ if (gc_all_exceeded) {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] start GC {}, request more than exceeded, try free
size = (group used - "
+ "group limit).",
+ log_prefix);
+ } else {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] start GC {}, request less than exceeded, try free
size = ((group used "
+ "- group limit) / all group total_exceeded_memory) *
request_free_memory.",
+ log_prefix);
+ }
+
+ Defer defer {[&]() {
+ if (total_free_memory > 0) {
+ std::stringstream ss;
+ profile->pretty_print(&ss);
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] end GC {}, free memory {}. cost(us): {},
details: {}", log_prefix,
+ PrettyPrinter::print(total_free_memory, TUnit::BYTES),
+ watch.elapsed_time() / 1000, ss.str());
+ }
+ }};
+
for (int i = 0; i < task_groups.size(); ++i) {
if (exceeded_memorys[i] == 0) {
continue;
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 1ccd918c4a3..3691934b800 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -184,8 +184,9 @@ public:
static bool process_minor_gc();
static bool process_full_gc();
- static int64_t tg_hard_memory_limit_gc();
- static int64_t tg_soft_memory_limit_gc(int64_t request_free_memory,
RuntimeProfile* profile);
+ static int64_t tg_not_enable_overcommit_group_gc();
+ static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
+ RuntimeProfile* profile);
// It is only used after the memory limit is exceeded. When multiple
threads are waiting for the available memory of the process,
// avoid multiple threads starting at the same time and causing OOM.
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 089293b0f18..1c8085aac45 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -45,8 +45,10 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
// Otherwise, if the external catch, directly throw bad::alloc.
auto err_msg = fmt::format(
"Allocator sys memory check failed: Cannot alloc:{}, consuming
"
- "tracker:<{}>, exec node:<{}>, {}.",
+ "tracker:<{}>, peak used {}, current used {}, exec node:<{}>,
{}.",
size, doris::thread_context()->thread_mem_tracker()->label(),
+
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
+ doris::thread_context()->thread_mem_tracker()->consumption(),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
if (size > 1024l * 1024 * 1024 &&
!doris::enable_thread_catch_bad_alloc &&
@@ -121,21 +123,17 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::memory_tracker_check(siz
if (doris::thread_context()->skip_memory_check) return;
auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size);
if (!st) {
- auto err_msg =
-
doris::thread_context()->thread_mem_tracker()->query_tracker_limit_exceeded_str(
- st.to_string(),
-
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
- "Allocator mem tracker check failed");
+ auto err_msg = fmt::format("Allocator mem tracker check failed, {}",
st.to_string());
doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg);
// If the external catch, throw bad::alloc first, let the query
actively cancel. Otherwise asynchronous cancel.
if
(doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) {
doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
if (!doris::enable_thread_catch_bad_alloc) {
- LOG(INFO) << fmt::format("Query:{} canceled asyn, {}.",
+ LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.",
print_id(doris::thread_context()->task_id()), err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
} else {
- LOG(INFO) << fmt::format("Query:{} throw exception, {}.",
+ LOG(INFO) << fmt::format("query/load:{} throw exception, {}.",
print_id(doris::thread_context()->task_id()), err_msg);
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED,
err_msg);
}
@@ -170,7 +168,7 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::throw_bad_alloc(
const std::string& err) const {
LOG(WARNING) << err
<< fmt::format(
- " OS physical memory {}. Process memory usage {},
Sys available memory "
+ " os physical memory {}. process memory used {},
sys available memory "
"{}, Stacktrace: {}",
doris::PrettyPrinter::print(doris::MemInfo::physical_mem(),
doris::TUnit::BYTES),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]