This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 84b97860a1 [fix](memory) Fix memory exceed limit and query has been
canceled, Allocator will block 100ms (#20959)
84b97860a1 is described below
commit 84b97860a103f684cf892c7151fa3879ef9a7fbb
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Jun 21 17:35:19 2023 +0800
[fix](memory) Fix memory exceed limit and query has been canceled,
Allocator will block 100ms (#20959)
---
be/src/runtime/memory/mem_tracker_limiter.cpp | 4 ++-
be/src/vec/common/allocator.cpp | 39 ++++++++++++++++++---------
2 files changed, 30 insertions(+), 13 deletions(-)
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 24a79bc1b6..ded5b0c0b8 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -437,7 +437,9 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
for (auto tracker : tracker_groups[i].trackers) {
if (tracker->type() == type) {
- if (tracker->consumption() <= 33554432) { // 32M small query
does not cancel
+ // 32M small query does not cancel
+ if (tracker->consumption() <= 33554432 ||
+ tracker->consumption() < tracker->limit()) {
continue;
}
if (tracker->is_query_cancelled()) {
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index b74ed398d4..366fc48ce2 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -31,6 +31,7 @@
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
#include "runtime/thread_context.h"
+#include "util/defer_op.h"
#include "util/mem_info.h"
#include "util/uid_util.h"
@@ -47,12 +48,22 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
size, doris::thread_context()->thread_mem_tracker()->label(),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
+
+ // TODO, Save the query context in the thread context, instead of
finding whether the query id is canceled in fragment_mgr.
+ if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+ doris::thread_context()->task_id())) {
+ if (doris::enable_thread_catch_bad_alloc) {
+ throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED,
err_msg);
+ }
+ return;
+ }
if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()
&&
doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) {
- int64_t wait_milliseconds =
doris::config::thread_wait_gc_max_milliseconds;
- LOG(INFO) << fmt::format("Query:{} waiting for enough memory,
maximum 5s, {}.",
-
print_id(doris::thread_context()->task_id()), err_msg);
- while (wait_milliseconds > 0) {
+ int64_t wait_milliseconds = 0;
+ LOG(INFO) << fmt::format("Query:{} waiting for enough memory,
maximum {}ms, {}.",
+
print_id(doris::thread_context()->task_id()),
+
doris::config::thread_wait_gc_max_milliseconds, err_msg);
+ while (wait_milliseconds <
doris::config::thread_wait_gc_max_milliseconds) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if
(!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
doris::MemInfo::refresh_interval_memory_growth += size;
@@ -60,25 +71,29 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
}
if
(doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
doris::thread_context()->task_id())) {
- wait_milliseconds = 0;
- break;
+ if (doris::enable_thread_catch_bad_alloc) {
+ throw
doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
+ }
+ return;
}
- wait_milliseconds -= 100;
+ wait_milliseconds += 100;
}
- if (wait_milliseconds <= 0) {
+ if (wait_milliseconds >=
doris::config::thread_wait_gc_max_milliseconds) {
// Make sure to completely wait
thread_wait_gc_max_milliseconds only once.
doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
doris::MemTrackerLimiter::print_log_process_usage(err_msg);
// If the external catch, throw bad::alloc first, let the
query actively cancel. Otherwise asynchronous cancel.
if (!doris::enable_thread_catch_bad_alloc) {
LOG(INFO) << fmt::format(
- "Query:{} canceled asyn, after waiting for memory
5s, {}.",
- print_id(doris::thread_context()->task_id()),
err_msg);
+ "Query:{} canceled asyn, after waiting for memory
{}ms, {}.",
+ print_id(doris::thread_context()->task_id()),
wait_milliseconds,
+ err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
} else {
LOG(INFO) << fmt::format(
- "Query:{} throw exception, after waiting for
memory 5s, {}.",
- print_id(doris::thread_context()->task_id()),
err_msg);
+ "Query:{} throw exception, after waiting for
memory {}ms, {}.",
+ print_id(doris::thread_context()->task_id()),
wait_milliseconds,
+ err_msg);
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED,
err_msg);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]