This is an automated email from the ASF dual-hosted git repository.
luozenglin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6c9c9e9765 [feature-wip](resource-group) Supports memory hard
isolation of resource group (#19526)
6c9c9e9765 is described below
commit 6c9c9e97658859168ac7f43d54aa293491a8d257
Author: luozenglin <[email protected]>
AuthorDate: Mon May 15 22:45:46 2023 +0800
[feature-wip](resource-group) Supports memory hard isolation of resource
group (#19526)
---
be/src/common/daemon.cpp | 19 ++-
be/src/pipeline/task_queue.cpp | 6 +-
be/src/pipeline/task_queue.h | 14 +-
be/src/pipeline/task_scheduler.cpp | 9 +-
be/src/pipeline/task_scheduler.h | 4 +-
be/src/runtime/fragment_mgr.cpp | 37 ++---
be/src/runtime/memory/mem_tracker.h | 6 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 180 +++++++++++++--------
be/src/runtime/memory/mem_tracker_limiter.h | 66 +++++++-
be/src/runtime/query_context.h | 3 +
be/src/runtime/task_group/task_group.cpp | 107 +++++++++---
be/src/runtime/task_group/task_group.h | 39 +++--
be/src/runtime/task_group/task_group_manager.cpp | 35 +++-
be/src/runtime/task_group/task_group_manager.h | 2 +
.../java/org/apache/doris/qe/StmtExecutor.java | 9 +-
.../resource/resourcegroup/ResourceGroup.java | 43 +++--
.../resource/resourcegroup/ResourceGroupMgr.java | 23 ++-
.../resourcegroup/ResourceGroupMgrTest.java | 6 +
.../resource/resourcegroup/ResourceGroupTest.java | 21 ++-
19 files changed, 440 insertions(+), 189 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 5c832cfd77..592442b920 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -47,6 +47,7 @@
#include "runtime/load_channel_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/task_group/task_group_manager.h"
#include "runtime/user_function_cache.h"
#include "service/backend_options.h"
#include "util/cpu_info.h"
@@ -230,10 +231,16 @@ void Daemon::memory_gc_thread() {
if (!MemInfo::initialized() || !ExecEnv::GetInstance()->initialized())
{
continue;
}
+ auto sys_mem_available = doris::MemInfo::sys_mem_available();
+ auto proc_mem_no_allocator_cache =
doris::MemInfo::proc_mem_no_allocator_cache();
+
+ auto tg_free_mem =
taskgroup::TaskGroupManager::instance()->memory_limit_gc();
+ sys_mem_available += tg_free_mem;
+ proc_mem_no_allocator_cache -= tg_free_mem;
+
if (memory_full_gc_sleep_time_ms <= 0 &&
- (doris::MemInfo::sys_mem_available() <
- doris::MemInfo::sys_mem_available_low_water_mark() ||
- doris::MemInfo::proc_mem_no_allocator_cache() >=
doris::MemInfo::mem_limit())) {
+ (sys_mem_available <
doris::MemInfo::sys_mem_available_low_water_mark() ||
+ proc_mem_no_allocator_cache >= doris::MemInfo::mem_limit())) {
// No longer full gc and minor gc during sleep.
memory_full_gc_sleep_time_ms = config::memory_gc_sleep_time_s *
1000;
memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s *
1000;
@@ -243,10 +250,8 @@ void Daemon::memory_gc_thread() {
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
} else if (memory_minor_gc_sleep_time_ms <= 0 &&
- (doris::MemInfo::sys_mem_available() <
-
doris::MemInfo::sys_mem_available_warning_water_mark() ||
- doris::MemInfo::proc_mem_no_allocator_cache() >=
- doris::MemInfo::soft_mem_limit())) {
+ (sys_mem_available <
doris::MemInfo::sys_mem_available_warning_water_mark() ||
+ proc_mem_no_allocator_cache >=
doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s *
1000;
doris::MemTrackerLimiter::print_log_process_usage("process minor
gc", false);
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 8078d7b414..807a344cf9 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -350,8 +350,8 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask*
task, int64_t time_spen
}
}
-void TaskGroupTaskQueue::update_task_group(const taskgroup::TaskGroupInfo&
task_group_info,
- taskgroup::TaskGroupPtr&
task_group) {
+void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo&
task_group_info,
+ taskgroup::TaskGroupPtr
task_group) {
std::unique_lock<std::mutex> lock(_rs_mutex);
auto* entity = task_group->task_entity();
bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
@@ -359,7 +359,7 @@ void TaskGroupTaskQueue::update_task_group(const
taskgroup::TaskGroupInfo& task_
_group_entities.erase(entity);
_total_cpu_share -= entity->cpu_share();
}
- task_group->check_and_update(task_group_info);
+ task_group->update_cpu_share_unlock(task_group_info);
if (is_in_queue) {
_group_entities.emplace(entity);
_total_cpu_share += entity->cpu_share();
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 6b16b70f99..9956ba3cb9 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -53,8 +53,8 @@ public:
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
- virtual void update_task_group(const taskgroup::TaskGroupInfo&
task_group_info,
- taskgroup::TaskGroupPtr& task_group) = 0;
+ virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo&
task_group_info,
+ taskgroup::TaskGroupPtr task_group) = 0;
int cores() const { return _core_size; }
@@ -154,9 +154,9 @@ public:
time_spent);
}
- void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TaskGroupPtr& task_group) override {
- LOG(FATAL) << "update_task_group not implemented";
+ void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
+ taskgroup::TaskGroupPtr task_group) override {
+ LOG(FATAL) << "update_tg_cpu_share not implemented";
}
private:
@@ -184,8 +184,8 @@ public:
void update_statistics(PipelineTask* task, int64_t time_spent) override;
- void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TaskGroupPtr& task_group) override;
+ void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
+ taskgroup::TaskGroupPtr task_group) override;
private:
template <bool from_executor>
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index cd219a25e0..53e9a4d868 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -354,12 +354,9 @@ void TaskScheduler::shutdown() {
}
}
-void TaskScheduler::try_update_task_group(const taskgroup::TaskGroupInfo&
task_group_info,
- taskgroup::TaskGroupPtr& task_group)
{
- if (!task_group->check_version(task_group_info._version)) {
- return;
- }
- _task_queue->update_task_group(task_group_info, task_group);
+void TaskScheduler::update_tg_cpu_share(const taskgroup::TaskGroupInfo&
task_group_info,
+ taskgroup::TaskGroupPtr task_group) {
+ _task_queue->update_tg_cpu_share(task_group_info, task_group);
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index b2a665f034..1fcbe2c068 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -91,8 +91,8 @@ public:
void shutdown();
- void try_update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TaskGroupPtr& task_group);
+ void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
+ taskgroup::TaskGroupPtr task_group);
private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0cca7fa10f..c8e6a8b25e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -56,7 +56,6 @@
#include "io/fs/stream_load_pipe.h"
#include "opentelemetry/trace/scope.h"
#include "pipeline/pipeline_fragment_context.h"
-#include "pipeline/task_scheduler.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
@@ -691,6 +690,25 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
query_ctx->query_mem_tracker->enable_print_log_usage();
}
+ if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
+ if (params.__isset.resource_groups &&
!params.resource_groups.empty()) {
+ taskgroup::TaskGroupInfo task_group_info;
+ auto status =
taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0],
+
&task_group_info);
+ if (status.ok()) {
+ auto tg =
taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
+ task_group_info);
+ tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
+ query_ctx->set_task_group(tg);
+ LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id)
+ << " use task group: " << tg->debug_string();
+ }
+ } else {
+ VLOG_DEBUG << "Query/load id: " <<
print_id(query_ctx->query_id)
+ << " does not use task group.";
+ }
+ }
+
{
// Find _query_ctx_map again, in case some other request has
already
// create the query fragments context.
@@ -803,23 +821,6 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<QueryContext> query_ctx;
RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
- if (params.__isset.resource_groups && !params.resource_groups.empty()) {
- taskgroup::TaskGroupInfo task_group_info;
- auto status =
taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0],
-
&task_group_info);
- if (status.ok()) {
- auto tg =
taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
- task_group_info);
-
_exec_env->pipeline_task_group_scheduler()->try_update_task_group(task_group_info,
tg);
- query_ctx->set_task_group(tg);
- LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id)
- << " use task group: " << tg->debug_string();
- }
- } else {
- VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id)
- << " does not use task group.";
- }
-
for (size_t i = 0; i < params.local_params.size(); i++) {
const auto& local_params = params.local_params[i];
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index 81f901879c..c21ef478e0 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -115,7 +115,7 @@ public:
// For MemTrackerLimiter
MemTracker() { _parent_group_num = -1; }
- ~MemTracker();
+ virtual ~MemTracker();
static std::string print_bytes(int64_t bytes) {
return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
@@ -154,13 +154,13 @@ public:
static void refresh_all_tracker_profile();
public:
- Snapshot make_snapshot() const;
+ virtual Snapshot make_snapshot() const;
// Specify group_num from mem_tracker_pool to generate snapshot.
static void make_group_snapshot(std::vector<Snapshot>* snapshots, int64_t
group_num,
std::string parent_label);
static std::string log_usage(MemTracker::Snapshot snapshot);
- std::string debug_string() {
+ virtual std::string debug_string() {
std::stringstream msg;
msg << "label: " << _label << "; "
<< "consumption: " << consumption() << "; "
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index d9974bef7f..076af90fba 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -29,6 +29,7 @@
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/load_channel_mgr.h"
+#include "runtime/task_group/task_group.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
@@ -37,15 +38,10 @@
namespace doris {
-struct TrackerLimiterGroup {
- std::list<MemTrackerLimiter*> trackers;
- std::mutex group_lock;
-};
-
// Save all MemTrackerLimiters in use.
// Each group corresponds to several MemTrackerLimiters and has a lock.
// Multiple groups are used to reduce the impact of locks.
-static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(1000);
+static std::vector<TrackerLimiterGroup>
mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM);
std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
bool MemTrackerLimiter::_oom_avoidance {true};
@@ -94,7 +90,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const {
Snapshot snapshot;
- snapshot.type = TypeString[_type];
+ snapshot.type = type_string(_type);
snapshot.label = _label;
snapshot.limit = _limit;
snapshot.cur_consumption = _consumption->current_value();
@@ -131,7 +127,7 @@ void
MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
int64_t process_mem_sum = 0;
Snapshot snapshot;
for (auto it : MemTrackerLimiter::TypeMemSum) {
- snapshot.type = TypeString[it.first];
+ snapshot.type = type_string(it.first);
snapshot.label = "";
snapshot.limit = -1;
snapshot.cur_consumption = it.second->current_value();
@@ -315,14 +311,35 @@ std::string
MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) {
int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
const std::string& vm_rss_str,
const std::string&
mem_available_str, Type type) {
- std::priority_queue<std::pair<int64_t, std::string>,
- std::vector<std::pair<int64_t, std::string>>,
- std::greater<std::pair<int64_t, std::string>>>
- min_pq;
+ return free_top_memory_query(
+ min_free_mem, type, mem_tracker_limiter_pool,
+ [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
+ const std::string& label)
{
+ return fmt::format(
+ "Process has no memory available, cancel top memory
usage {}: "
+ "{} memory tracker <{}> consumption {}, backend {} "
+ "process memory used {} exceed limit {} or sys mem
available {} "
+ "less than low water mark {}. Execute again after
enough memory, "
+ "details see be.INFO.",
+ type_string(type), type_string(type), label,
print_bytes(mem_consumption),
+ BackendOptions::get_localhost(), vm_rss_str,
MemInfo::mem_limit_str(),
+ mem_available_str,
+
print_bytes(MemInfo::sys_mem_available_low_water_mark()));
+ });
+}
+
+template <typename TrackerGroups>
+int64_t MemTrackerLimiter::free_top_memory_query(
+ int64_t min_free_mem, Type type, std::vector<TrackerGroups>&
tracker_groups,
+ const std::function<std::string(int64_t, const std::string&)>&
cancel_msg) {
+ using MemTrackerMinQueue = std::priority_queue<std::pair<int64_t,
std::string>,
+
std::vector<std::pair<int64_t, std::string>>,
+
std::greater<std::pair<int64_t, std::string>>>;
+ MemTrackerMinQueue min_pq;
// After greater than min_free_mem, will not be modified.
int64_t prepare_free_mem = 0;
- auto cancel_top_query = [&](auto min_pq) -> int64_t {
+ auto cancel_top_query = [&cancel_msg, type](auto& min_pq) -> int64_t {
std::vector<std::string> usage_strings;
int64_t freed_mem = 0;
while (!min_pq.empty()) {
@@ -333,15 +350,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t
min_free_mem,
}
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
cancelled_queryid,
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
- fmt::format("Process has no memory available, cancel top
memory usage {}: "
- "{} memory tracker <{}> consumption {},
backend {} "
- "process memory used {} exceed limit {} or sys
mem available {} "
- "less than low water mark {}. Execute again
after enough memory, "
- "details see be.INFO.",
- TypeString[type], TypeString[type],
min_pq.top().second,
- print_bytes(min_pq.top().first),
BackendOptions::get_localhost(),
- vm_rss_str, MemInfo::mem_limit_str(),
mem_available_str,
-
print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+ cancel_msg(min_pq.top().first, min_pq.top().second));
freed_mem += min_pq.top().first;
usage_strings.push_back(fmt::format("{} memory usage {} Bytes",
min_pq.top().second,
@@ -349,38 +358,34 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t
min_free_mem,
min_pq.pop();
}
if (!usage_strings.empty()) {
- LOG(INFO) << "Process GC Free Top Memory Usage " <<
TypeString[type] << ": "
+ LOG(INFO) << "Process GC Free Top Memory Usage " <<
type_string(type) << ": "
<< join(usage_strings, ",");
}
return freed_mem;
};
- for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
- std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
- for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+ for (unsigned i = 1; i < tracker_groups.size(); ++i) {
+ std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
+ for (auto tracker : tracker_groups[i].trackers) {
if (tracker->type() == type) {
if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
label_to_queryid(tracker->label()))) {
continue;
}
if (tracker->consumption() > min_free_mem) {
- std::priority_queue<std::pair<int64_t, std::string>,
- std::vector<std::pair<int64_t,
std::string>>,
- std::greater<std::pair<int64_t,
std::string>>>
- min_pq_null;
- std::swap(min_pq, min_pq_null);
- min_pq.push(std::pair<int64_t,
std::string>(tracker->consumption(),
-
tracker->label()));
- return cancel_top_query(min_pq);
+ MemTrackerMinQueue min_pq_single;
+ min_pq_single.emplace(tracker->consumption(),
tracker->label());
+ return cancel_top_query(min_pq_single);
} else if (tracker->consumption() + prepare_free_mem <
min_free_mem) {
- min_pq.push(std::pair<int64_t,
std::string>(tracker->consumption(),
-
tracker->label()));
+ min_pq.emplace(tracker->consumption(), tracker->label());
prepare_free_mem += tracker->consumption();
} else if (tracker->consumption() > min_pq.top().first) {
- // No need to modify prepare_free_mem, prepare_free_mem
will always be greater than min_free_mem.
- min_pq.push(std::pair<int64_t,
std::string>(tracker->consumption(),
-
tracker->label()));
- min_pq.pop();
+ min_pq.emplace(tracker->consumption(), tracker->label());
+ prepare_free_mem += tracker->consumption();
+ while (prepare_free_mem - min_pq.top().first >
min_free_mem) {
+ prepare_free_mem -= min_pq.top().first;
+ min_pq.pop();
+ }
}
}
}
@@ -392,15 +397,33 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
const std::string&
vm_rss_str,
const std::string&
mem_available_str,
Type type) {
- std::priority_queue<std::pair<int64_t, std::string>,
- std::vector<std::pair<int64_t, std::string>>,
- std::greater<std::pair<int64_t, std::string>>>
- min_pq;
+ return free_top_overcommit_query(
+ min_free_mem, type, mem_tracker_limiter_pool,
+ [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
+ const std::string& label)
{
+ return fmt::format(
+ "Process has less memory, cancel top memory overcommit
{}: "
+ "{} memory tracker <{}> consumption {}, backend {} "
+ "process memory used {} exceed soft limit {} or sys
mem available {} "
+ "less than warning water mark {}. Execute again after
enough memory, "
+ "details see be.INFO.",
+ type_string(type), type_string(type), label,
print_bytes(mem_consumption),
+ BackendOptions::get_localhost(), vm_rss_str,
MemInfo::soft_mem_limit_str(),
+ mem_available_str,
+
print_bytes(MemInfo::sys_mem_available_warning_water_mark()));
+ });
+}
+
+template <typename TrackerGroups>
+int64_t MemTrackerLimiter::free_top_overcommit_query(
+ int64_t min_free_mem, Type type, std::vector<TrackerGroups>&
tracker_groups,
+ const std::function<std::string(int64_t, const std::string&)>&
cancel_msg) {
+ std::priority_queue<std::pair<int64_t, std::string>> max_pq;
std::unordered_map<std::string, int64_t> query_consumption;
- for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
- std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
- for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+ for (unsigned i = 1; i < tracker_groups.size(); ++i) {
+ std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
+ for (auto tracker : tracker_groups[i].trackers) {
if (tracker->type() == type) {
if (tracker->consumption() <= 33554432) { // 32M small query
does not cancel
continue;
@@ -411,7 +434,7 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
}
int64_t overcommit_ratio =
(static_cast<double>(tracker->consumption()) /
tracker->limit()) * 10000;
- min_pq.push(std::pair<int64_t, std::string>(overcommit_ratio,
tracker->label()));
+ max_pq.emplace(overcommit_ratio, tracker->label());
query_consumption[tracker->label()] = tracker->consumption();
}
}
@@ -422,13 +445,6 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
return 0;
}
- std::priority_queue<std::pair<int64_t, std::string>> max_pq;
- // Min-heap to Max-heap.
- while (!min_pq.empty()) {
- max_pq.push(min_pq.top());
- min_pq.pop();
- }
-
std::vector<std::string> usage_strings;
int64_t freed_mem = 0;
while (!max_pq.empty()) {
@@ -440,15 +456,7 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
int64_t query_mem = query_consumption[max_pq.top().second];
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
cancelled_queryid,
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
- fmt::format("Process has less memory, cancel top memory
overcommit {}: "
- "{} memory tracker <{}> consumption {}, backend {}
"
- "process memory used {} exceed soft limit {} or
sys mem available {} "
- "less than warning water mark {}. Execute again
after enough memory, "
- "details see be.INFO.",
- TypeString[type], TypeString[type],
max_pq.top().second,
- print_bytes(query_mem),
BackendOptions::get_localhost(), vm_rss_str,
- MemInfo::soft_mem_limit_str(), mem_available_str,
-
print_bytes(MemInfo::sys_mem_available_warning_water_mark())));
+ cancel_msg(query_mem, max_pq.top().second));
usage_strings.push_back(fmt::format("{} memory usage {} Bytes,
overcommit ratio: {}",
max_pq.top().second, query_mem,
max_pq.top().first));
@@ -459,10 +467,52 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
max_pq.pop();
}
if (!usage_strings.empty()) {
- LOG(INFO) << "Process GC Free Top Memory Overcommit " <<
TypeString[type] << ": "
+ LOG(INFO) << "Process GC Free Top Memory Overcommit " <<
type_string(type) << ": "
<< join(usage_strings, ",");
}
return freed_mem;
}
+int64_t MemTrackerLimiter::tg_memory_limit_gc(
+ uint64_t id, const std::string& name, int64_t memory_limit,
+ std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups)
{
+ int64_t used_memory = 0;
+ for (auto& mem_tracker_group : tracker_limiter_groups) {
+ std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
+ for (const auto& tracker : mem_tracker_group.trackers) {
+ used_memory += tracker->consumption();
+ }
+ }
+
+ if (used_memory <= memory_limit) {
+ return 0;
+ }
+
+ int64_t need_free_mem = used_memory - memory_limit;
+ int64_t freed_mem = 0;
+ constexpr auto query_type = MemTrackerLimiter::Type::QUERY;
+ auto cancel_str = [id, &name, memory_limit, used_memory](int64_t
mem_consumption,
+ const
std::string& label) {
+ return fmt::format(
+ "Resource group id:{}, name:{} memory exceeded limit, cancel
top memory {}: "
+ "memory tracker <{}> consumption {}, backend {}, "
+ "resource group memory used {}, memory limit {}.",
+ id, name, MemTrackerLimiter::type_string(query_type), label,
+ MemTracker::print_bytes(mem_consumption),
BackendOptions::get_localhost(),
+ MemTracker::print_bytes(used_memory),
MemTracker::print_bytes(memory_limit));
+ };
+ if (config::enable_query_memroy_overcommit) {
+ freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+ need_free_mem - freed_mem, query_type, tracker_limiter_groups,
cancel_str);
+ }
+ if (freed_mem < need_free_mem) {
+ freed_mem += MemTrackerLimiter::free_top_memory_query(need_free_mem -
freed_mem, query_type,
+
tracker_limiter_groups, cancel_str);
+ }
+ LOG(INFO) << fmt::format(
+ "task group {} finished gc, memory_limit: {}, used_memory: {},
freed_mem: {}.", name,
+ memory_limit, used_memory, freed_mem);
+ return freed_mem;
+}
+
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 7665a029f8..3cc7108b27 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -28,6 +28,7 @@
#include <list>
#include <memory>
#include <ostream>
+#include <queue>
#include <string>
#include <unordered_map>
#include <vector>
@@ -41,6 +42,21 @@
namespace doris {
+constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
+
+namespace taskgroup {
+struct TgTrackerLimiterGroup;
+class TaskGroup;
+using TaskGroupPtr = std::shared_ptr<TaskGroup>;
+} // namespace taskgroup
+
+class MemTrackerLimiter;
+
+struct TrackerLimiterGroup {
+ std::list<MemTrackerLimiter*> trackers;
+ std::mutex group_lock;
+};
+
// Track and limit the memory usage of process and query.
// Contains an limit, arranged into a tree structure.
//
@@ -49,7 +65,7 @@ namespace doris {
// will be recorded on this Query, otherwise it will be recorded in Orphan
Tracker by default.
class MemTrackerLimiter final : public MemTracker {
public:
- enum Type {
+ enum class Type {
GLOBAL = 0, // Life cycle is the same as the process, e.g.
Cache and default Orphan
QUERY = 1, // Count the memory consumption of all Query tasks.
LOAD = 2, // Count the memory consumption of all Load tasks.
@@ -76,9 +92,6 @@ public:
{Type::EXPERIMENTAL,
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}};
- inline static const std::string TypeString[] = {
- "global", "query", "load", "compaction", "schema_change", "clone",
"experimental"};
-
public:
// byte_limit equal to -1 means no consumption limit, only participate in
process memory statistics.
MemTrackerLimiter(Type type, const std::string& label = std::string(),
int64_t byte_limit = -1,
@@ -87,6 +100,28 @@ public:
~MemTrackerLimiter();
+ static std::string type_string(Type type) {
+ switch (type) {
+ case Type::GLOBAL:
+ return "global";
+ case Type::QUERY:
+ return "query";
+ case Type::LOAD:
+ return "load";
+ case Type::COMPACTION:
+ return "compaction";
+ case Type::SCHEMA_CHANGE:
+ return "schema_change";
+ case Type::CLONE:
+ return "clone";
+ case Type::EXPERIMENTAL:
+ return "experimental";
+ default:
+ LOG(FATAL) << "not match type of mem tracker limiter :" <<
static_cast<int>(type);
+ }
+ __builtin_unreachable();
+ }
+
static bool sys_mem_exceed_limit_check(int64_t bytes);
void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption
not supported"; }
@@ -118,7 +153,7 @@ public:
static void refresh_global_counter();
static void refresh_all_tracker_profile();
- Snapshot make_snapshot() const;
+ Snapshot make_snapshot() const override;
// Returns a list of all the valid tracker snapshots.
static void make_process_snapshots(std::vector<MemTracker::Snapshot>*
snapshots);
static void make_type_snapshots(std::vector<MemTracker::Snapshot>*
snapshots, Type type);
@@ -137,6 +172,12 @@ public:
static int64_t free_top_memory_query(int64_t min_free_mem, const
std::string& vm_rss_str,
const std::string& mem_available_str,
Type type = Type::QUERY);
+
+ template <typename TrackerGroups>
+ static int64_t free_top_memory_query(
+ int64_t min_free_mem, Type type, std::vector<TrackerGroups>&
tracker_groups,
+ const std::function<std::string(int64_t, const std::string&)>&
cancel_msg);
+
static int64_t free_top_memory_load(int64_t min_free_mem, const
std::string& vm_rss_str,
const std::string& mem_available_str) {
return free_top_memory_query(min_free_mem, vm_rss_str,
mem_available_str, Type::LOAD);
@@ -146,10 +187,21 @@ public:
static int64_t free_top_overcommit_query(int64_t min_free_mem, const
std::string& vm_rss_str,
const std::string&
mem_available_str,
Type type = Type::QUERY);
+
+ template <typename TrackerGroups>
+ static int64_t free_top_overcommit_query(
+ int64_t min_free_mem, Type type, std::vector<TrackerGroups>&
tracker_groups,
+ const std::function<std::string(int64_t, const std::string&)>&
cancel_msg);
+
static int64_t free_top_overcommit_load(int64_t min_free_mem, const
std::string& vm_rss_str,
const std::string&
mem_available_str) {
return free_top_overcommit_query(min_free_mem, vm_rss_str,
mem_available_str, Type::LOAD);
}
+
+ static int64_t tg_memory_limit_gc(
+ uint64_t id, const std::string& name, int64_t memory_limit,
+ std::vector<taskgroup::TgTrackerLimiterGroup>&
tracker_limiter_groups);
+
// only for Type::QUERY or Type::LOAD.
static TUniqueId label_to_queryid(const std::string& label) {
if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) !=
0) {
@@ -170,12 +222,12 @@ public:
std::string tracker_limit_exceeded_str();
std::string tracker_limit_exceeded_str(int64_t bytes);
- std::string debug_string() {
+ std::string debug_string() override {
std::stringstream msg;
msg << "limit: " << _limit << "; "
<< "consumption: " << _consumption->current_value() << "; "
<< "label: " << _label << "; "
- << "type: " << TypeString[_type] << "; ";
+ << "type: " << type_string(_type) << "; ";
return msg.str();
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 0909702c70..7158f8b054 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -73,6 +73,9 @@ public:
MemTracker::print_bytes(query_mem_tracker->consumption()),
MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
}
+ if (_task_group) {
+ _task_group->remove_mem_tracker_limiter(query_mem_tracker);
+ }
}
// Notice. For load fragments, the fragment_num sent by FE has a small
probability of 0.
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index ebe775a485..16a6356f30 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -27,10 +27,19 @@
#include <utility>
#include "common/logging.h"
+#include "pipeline/task_scheduler.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "service/backend_options.h"
+#include "util/mem_info.h"
+#include "util/parse_util.h"
namespace doris {
namespace taskgroup {
+const static std::string CPU_SHARE = "cpu_share";
+const static std::string MEMORY_LIMIT = "memory_limit";
+
pipeline::PipelineTask* TaskGroupEntity::take() {
if (_queue.empty()) {
return nullptr;
@@ -67,36 +76,76 @@ std::string TaskGroupEntity::debug_string() const {
cpu_share(), _queue.size(), _vruntime_ns);
}
-TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share,
int64_t version)
- : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this),
_version(version) {}
+TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
+ : _id(tg_info.id),
+ _name(tg_info.name),
+ _cpu_share(tg_info.cpu_share),
+ _memory_limit(tg_info.memory_limit),
+ _version(tg_info.version),
+ _task_entity(this),
+ _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
std::string TaskGroup::debug_string() const {
- std::shared_lock<std::shared_mutex> rl {mutex};
- return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]",
_id, _name,
- cpu_share(), _version);
-}
-
-bool TaskGroup::check_version(int64_t version) const {
- std::shared_lock<std::shared_mutex> rl {mutex};
- return version > _version;
+ std::shared_lock<std::shared_mutex> rl {_mutex};
+ return fmt::format("TG[id = {}, name = {}, cpu_share = {}, memory_limit =
{}, version = {}]",
+ _id, _name, cpu_share(),
PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+ _version);
}
void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
- if (tg_info._id != _id) {
+ if (UNLIKELY(tg_info.id != _id)) {
return;
}
+ {
+ std::shared_lock<std::shared_mutex> rl {_mutex};
+ if (LIKELY(tg_info.version <= _version)) {
+ return;
+ }
+ }
+
+ std::lock_guard<std::shared_mutex> wl {_mutex};
+ if (tg_info.version > _version) {
+ _name = tg_info.name;
+ _version = tg_info.version;
+ _memory_limit = tg_info.memory_limit;
+ if (_cpu_share != tg_info.cpu_share) {
+
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share(
+ tg_info, shared_from_this());
+ }
+ }
+}
+
+void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) {
+ _cpu_share = tg_info.cpu_share;
+}
- std::lock_guard<std::shared_mutex> wl {mutex};
- if (tg_info._version > _version) {
- _name = tg_info._name;
- _cpu_share = tg_info._cpu_share;
- _version = tg_info._version;
+void TaskGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
+ auto group_num = mem_tracker_ptr->group_num();
+ std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
+ _mem_tracker_limiter_pool[group_num].trackers.insert(mem_tracker_ptr);
+}
+
+void TaskGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr) {
+ auto group_num = mem_tracker_ptr->group_num();
+ std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
+ _mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr);
+}
+
+int64_t TaskGroup::memory_limit_gc() {
+ std::string name;
+ int64_t memory_limit;
+ {
+ std::shared_lock<std::shared_mutex> rl {_mutex};
+ name = _name;
+ memory_limit = _memory_limit;
}
+ return MemTrackerLimiter::tg_memory_limit_gc(_id, name, memory_limit,
+ _mem_tracker_limiter_pool);
}
Status TaskGroupInfo::parse_group_info(const TPipelineResourceGroup&
resource_group,
TaskGroupInfo* task_group_info) {
- if (!check_group_info(resource_group)) {
+ if (UNLIKELY(!check_group_info(resource_group))) {
std::stringstream ss;
ss << "incomplete resource group parameters: ";
resource_group.printTo(ss);
@@ -108,17 +157,31 @@ Status TaskGroupInfo::parse_group_info(const
TPipelineResourceGroup& resource_gr
uint64_t share = 0;
std::from_chars(iter->second.c_str(), iter->second.c_str() +
iter->second.size(), share);
- task_group_info->_id = resource_group.id;
- task_group_info->_name = resource_group.name;
- task_group_info->_version = resource_group.version;
- task_group_info->_cpu_share = share;
+ task_group_info->id = resource_group.id;
+ task_group_info->name = resource_group.name;
+ task_group_info->version = resource_group.version;
+ task_group_info->cpu_share = share;
+
+ bool is_percent = true;
+ auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second;
+ auto mem_limit =
+ ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(),
&is_percent);
+ if (UNLIKELY(mem_limit <= 0)) {
+ std::stringstream ss;
+ ss << "parse memory limit from TPipelineResourceGroup error, " <<
MEMORY_LIMIT << ": "
+ << mem_limit_str;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+ task_group_info->memory_limit = mem_limit;
return Status::OK();
}
bool TaskGroupInfo::check_group_info(const TPipelineResourceGroup&
resource_group) {
return resource_group.__isset.id && resource_group.__isset.version &&
resource_group.__isset.name && resource_group.__isset.properties &&
- resource_group.properties.count(CPU_SHARE) > 0;
+ resource_group.properties.count(CPU_SHARE) > 0 &&
+ resource_group.properties.count(MEMORY_LIMIT) > 0;
}
} // namespace taskgroup
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index e66ef2e1b0..a7854158d2 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -25,6 +25,7 @@
#include <queue>
#include <shared_mutex>
#include <string>
+#include <unordered_set>
#include "common/status.h"
@@ -35,14 +36,13 @@ class PipelineTask;
}
class TPipelineResourceGroup;
+class MemTrackerLimiter;
namespace taskgroup {
class TaskGroup;
struct TaskGroupInfo;
-const static std::string CPU_SHARE = "cpu_share";
-
class TaskGroupEntity {
public:
explicit TaskGroupEntity(taskgroup::TaskGroup* ts) : _tg(ts) {}
@@ -72,9 +72,14 @@ private:
using TGEntityPtr = TaskGroupEntity*;
-class TaskGroup {
+struct TgTrackerLimiterGroup {
+ std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
+ std::mutex group_lock;
+};
+
+class TaskGroup : public std::enable_shared_from_this<TaskGroup> {
public:
- TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t
version);
+ explicit TaskGroup(const TaskGroupInfo& tg_info);
TaskGroupEntity* task_entity() { return &_task_entity; }
@@ -84,26 +89,36 @@ public:
std::string debug_string() const;
- bool check_version(int64_t version) const;
-
void check_and_update(const TaskGroupInfo& tg_info);
+ void update_cpu_share_unlock(const TaskGroupInfo& tg_info);
+
+ void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
+
+ void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
+
+ int64_t memory_limit_gc();
+
private:
- mutable std::shared_mutex mutex;
+ mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
const uint64_t _id;
std::string _name;
std::atomic<uint64_t> _cpu_share;
- TaskGroupEntity _task_entity;
+ int64_t _memory_limit; // bytes
int64_t _version;
+ TaskGroupEntity _task_entity;
+
+ std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
};
using TaskGroupPtr = std::shared_ptr<TaskGroup>;
struct TaskGroupInfo {
- uint64_t _id;
- std::string _name;
- uint64_t _cpu_share;
- int64_t _version;
+ uint64_t id;
+ std::string name;
+ uint64_t cpu_share;
+ int64_t version;
+ int64_t memory_limit;
static Status parse_group_info(const TPipelineResourceGroup&
resource_group,
TaskGroupInfo* task_group_info);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index d393264983..c741a0bffd 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -20,6 +20,7 @@
#include <memory>
#include <mutex>
+#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/task_group/task_group.h"
namespace doris::taskgroup {
@@ -35,20 +36,38 @@ TaskGroupManager* TaskGroupManager::instance() {
TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo&
task_group_info) {
{
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- if (_task_groups.count(task_group_info._id)) {
- return _task_groups[task_group_info._id];
+ if (LIKELY(_task_groups.count(task_group_info.id))) {
+ auto task_group = _task_groups[task_group_info.id];
+ task_group->check_and_update(task_group_info);
+ return task_group;
}
}
- auto new_task_group =
- std::make_shared<TaskGroup>(task_group_info._id,
task_group_info._name,
- task_group_info._cpu_share,
task_group_info._version);
+ auto new_task_group = std::make_shared<TaskGroup>(task_group_info);
std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
- if (_task_groups.count(task_group_info._id)) {
- return _task_groups[task_group_info._id];
+ if (_task_groups.count(task_group_info.id)) {
+ auto task_group = _task_groups[task_group_info.id];
+ task_group->check_and_update(task_group_info);
+ return task_group;
}
- _task_groups[task_group_info._id] = new_task_group;
+ _task_groups[task_group_info.id] = new_task_group;
return new_task_group;
}
+int64_t TaskGroupManager::memory_limit_gc() {
+ int64_t total_free_memory = 0;
+ std::vector<TaskGroupPtr> task_groups;
+ {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ task_groups.reserve(_task_groups.size());
+ for (const auto& [id, task_group] : _task_groups) {
+ task_groups.push_back(task_group);
+ }
+ }
+ for (const auto& task_group : task_groups) {
+ total_free_memory += task_group->memory_limit_gc();
+ }
+ return total_free_memory;
+}
+
} // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 0f415498a0..c053a3a45d 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -33,6 +33,8 @@ public:
TaskGroupPtr get_or_create_task_group(const TaskGroupInfo&
task_group_info);
+ int64_t memory_limit_gc();
+
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3a02eb67c0..4531535983 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -871,10 +871,6 @@ public class StmtExecutor {
|| (parsedStmt instanceof InsertStmt && !((InsertStmt)
parsedStmt).needLoadManager())
|| parsedStmt instanceof CreateTableAsSelectStmt
|| parsedStmt instanceof InsertOverwriteTableStmt) {
- if (Config.enable_resource_group &&
context.sessionVariable.enablePipelineEngine()) {
-
analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr()
-
.getResourceGroup(context.sessionVariable.resourceGroup));
- }
Map<Long, TableIf> tableMap = Maps.newTreeMap();
QueryStmt queryStmt;
Set<String> parentViewNameSet = Sets.newHashSet();
@@ -1060,6 +1056,11 @@ public class StmtExecutor {
parsedStmt.setIsExplain(explainOptions);
}
}
+ if (parsedStmt instanceof QueryStmt && Config.enable_resource_group
+ && context.sessionVariable.enablePipelineEngine()) {
+
analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr()
+
.getResourceGroup(context.sessionVariable.resourceGroup));
+ }
}
profile.getSummaryProfile().setQueryAnalysisFinishTime();
planner = new OriginalPlanner(analyzer);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
index 538c064247..b859b95752 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
@@ -30,6 +30,8 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
@@ -38,14 +40,17 @@ import java.util.HashMap;
import java.util.Map;
public class ResourceGroup implements Writable {
+ private static final Logger LOG =
LogManager.getLogger(ResourceGroup.class);
public static final String CPU_SHARE = "cpu_share";
+ public static final String MEMORY_LIMIT = "memory_limit";
+
private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new
ImmutableSet.Builder<String>().add(
- CPU_SHARE).build();
+ CPU_SHARE).add(MEMORY_LIMIT).build();
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new
ImmutableSet.Builder<String>().add(
- CPU_SHARE).build();
+ CPU_SHARE).add(MEMORY_LIMIT).build();
@SerializedName(value = "id")
private long id;
@@ -60,11 +65,10 @@ public class ResourceGroup implements Writable {
@SerializedName(value = "version")
private long version;
+ private double memoryLimitPercent;
+
private ResourceGroup(long id, String name, Map<String, String>
properties) {
- this.id = id;
- this.name = name;
- this.properties = properties;
- this.version = 0;
+ this(id, name, properties, 0);
}
private ResourceGroup(long id, String name, Map<String, String>
properties, long version) {
@@ -72,6 +76,8 @@ public class ResourceGroup implements Writable {
this.name = name;
this.properties = properties;
this.version = version;
+ String memoryLimitString = properties.get(MEMORY_LIMIT);
+ this.memoryLimitPercent =
Double.parseDouble(memoryLimitString.substring(0, memoryLimitString.length() -
1));
}
public static ResourceGroup create(String name, Map<String, String>
properties) throws DdlException {
@@ -79,10 +85,9 @@ public class ResourceGroup implements Writable {
return new ResourceGroup(Env.getCurrentEnv().getNextId(), name,
properties);
}
- public static ResourceGroup create(ResourceGroup resourceGroup,
Map<String, String> updateProperties)
+ public static ResourceGroup copyAndUpdate(ResourceGroup resourceGroup,
Map<String, String> updateProperties)
throws DdlException {
- Map<String, String> newProperties = new HashMap<>();
- newProperties.putAll(resourceGroup.getProperties());
+ Map<String, String> newProperties = new
HashMap<>(resourceGroup.getProperties());
for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
if (!Strings.isNullOrEmpty(kv.getValue())) {
newProperties.put(kv.getKey(), kv.getValue());
@@ -108,7 +113,21 @@ public class ResourceGroup implements Writable {
String cpuSchedulingWeight = properties.get(CPU_SHARE);
if (!StringUtils.isNumeric(cpuSchedulingWeight) ||
Long.parseLong(cpuSchedulingWeight) <= 0) {
- throw new DdlException(CPU_SHARE + " requires a positive
integer.");
+ throw new DdlException(CPU_SHARE + " " + cpuSchedulingWeight + "
requires a positive integer.");
+ }
+
+ String memoryLimit = properties.get(MEMORY_LIMIT);
+ if (!memoryLimit.endsWith("%")) {
+ throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + "
requires a percentage and ends with a '%'");
+ }
+ String memLimitErr = MEMORY_LIMIT + " " + memoryLimit + " requires a
positive floating point number.";
+ try {
+ if (Double.parseDouble(memoryLimit.substring(0,
memoryLimit.length() - 1)) <= 0) {
+ throw new DdlException(memLimitErr);
+ }
+ } catch (NumberFormatException e) {
+ LOG.debug(memLimitErr, e);
+ throw new DdlException(memLimitErr);
}
}
@@ -128,6 +147,10 @@ public class ResourceGroup implements Writable {
return version;
}
+ public double getMemoryLimitPercent() {
+ return memoryLimitPercent;
+ }
+
public void getProcNodeData(BaseProcResult result) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
result.addRow(Lists.newArrayList(String.valueOf(id), name,
entry.getKey(), entry.getValue()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
index d4e824365c..d4d22790d7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
@@ -46,6 +46,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ResourceGroupMgr implements Writable, GsonPostProcessable {
@@ -122,6 +123,7 @@ public class ResourceGroupMgr implements Writable,
GsonPostProcessable {
}
Map<String, String> properties = Maps.newHashMap();
properties.put(ResourceGroup.CPU_SHARE, "10");
+ properties.put(ResourceGroup.MEMORY_LIMIT, "100%");
defaultResourceGroup = ResourceGroup.create(DEFAULT_GROUP_NAME,
properties);
nameToResourceGroup.put(DEFAULT_GROUP_NAME, defaultResourceGroup);
idToResourceGroup.put(defaultResourceGroup.getId(),
defaultResourceGroup);
@@ -141,12 +143,14 @@ public class ResourceGroupMgr implements Writable,
GsonPostProcessable {
String resourceGroupName = resourceGroup.getName();
writeLock();
try {
- if (nameToResourceGroup.putIfAbsent(resourceGroupName,
resourceGroup) != null) {
+ if (nameToResourceGroup.containsKey(resourceGroupName)) {
if (stmt.isIfNotExists()) {
return;
}
throw new DdlException("Resource group " + resourceGroupName +
" already exist");
}
+ checkGlobalUnlock(resourceGroup, null);
+ nameToResourceGroup.put(resourceGroupName, resourceGroup);
idToResourceGroup.put(resourceGroup.getId(), resourceGroup);
Env.getCurrentEnv().getEditLog().logCreateResourceGroup(resourceGroup);
} finally {
@@ -155,6 +159,18 @@ public class ResourceGroupMgr implements Writable,
GsonPostProcessable {
LOG.info("Create resource group success: {}", resourceGroup);
}
+ private void checkGlobalUnlock(ResourceGroup resourceGroup, ResourceGroup
old) throws DdlException {
+ double totalMemoryLimit =
idToResourceGroup.values().stream().mapToDouble(ResourceGroup::getMemoryLimitPercent)
+ .sum() + resourceGroup.getMemoryLimitPercent();
+ if (!Objects.isNull(old)) {
+ totalMemoryLimit -= old.getMemoryLimitPercent();
+ }
+ if (totalMemoryLimit > 100.0 + 1e-6) {
+ throw new DdlException(
+ "The sum of all resource group " +
ResourceGroup.MEMORY_LIMIT + " cannot be greater than 100.0%.");
+ }
+ }
+
public void alterResourceGroup(AlterResourceGroupStmt stmt) throws
DdlException {
checkResourceGroupEnabled();
@@ -167,7 +183,8 @@ public class ResourceGroupMgr implements Writable,
GsonPostProcessable {
throw new DdlException("Resource Group(" + resourceGroupName +
") does not exist.");
}
ResourceGroup resourceGroup =
nameToResourceGroup.get(resourceGroupName);
- newResourceGroup = ResourceGroup.create(resourceGroup, properties);
+ newResourceGroup = ResourceGroup.copyAndUpdate(resourceGroup,
properties);
+ checkGlobalUnlock(newResourceGroup, resourceGroup);
nameToResourceGroup.put(resourceGroupName, newResourceGroup);
idToResourceGroup.put(newResourceGroup.getId(), newResourceGroup);
Env.getCurrentEnv().getEditLog().logAlterResourceGroup(newResourceGroup);
@@ -181,7 +198,7 @@ public class ResourceGroupMgr implements Writable,
GsonPostProcessable {
checkResourceGroupEnabled();
String resourceGroupName = stmt.getResourceGroupName();
- if (resourceGroupName == DEFAULT_GROUP_NAME) {
+ if (DEFAULT_GROUP_NAME.equals(resourceGroupName)) {
throw new DdlException("Dropping default resource group " +
resourceGroupName + " is not allowed");
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java
index 4a0f18914a..f6562f584a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java
@@ -82,6 +82,7 @@ public class ResourceGroupMgrTest {
ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr();
Map<String, String> properties1 = Maps.newHashMap();
properties1.put(ResourceGroup.CPU_SHARE, "10");
+ properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
String name1 = "g1";
CreateResourceGroupStmt stmt1 = new CreateResourceGroupStmt(false,
name1, properties1);
resourceGroupMgr.createResourceGroup(stmt1);
@@ -98,6 +99,7 @@ public class ResourceGroupMgrTest {
Map<String, String> properties2 = Maps.newHashMap();
properties2.put(ResourceGroup.CPU_SHARE, "20");
+ properties2.put(ResourceGroup.MEMORY_LIMIT, "30%");
String name2 = "g2";
CreateResourceGroupStmt stmt2 = new CreateResourceGroupStmt(false,
name2, properties2);
resourceGroupMgr.createResourceGroup(stmt2);
@@ -129,6 +131,7 @@ public class ResourceGroupMgrTest {
ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr();
Map<String, String> properties1 = Maps.newHashMap();
properties1.put(ResourceGroup.CPU_SHARE, "10");
+ properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
String name1 = "g1";
CreateResourceGroupStmt stmt1 = new CreateResourceGroupStmt(false,
name1, properties1);
resourceGroupMgr.createResourceGroup(stmt1);
@@ -152,6 +155,7 @@ public class ResourceGroupMgrTest {
ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr();
Map<String, String> properties = Maps.newHashMap();
properties.put(ResourceGroup.CPU_SHARE, "10");
+ properties.put(ResourceGroup.MEMORY_LIMIT, "30%");
String name = "g1";
CreateResourceGroupStmt createStmt = new
CreateResourceGroupStmt(false, name, properties);
resourceGroupMgr.createResourceGroup(createStmt);
@@ -188,11 +192,13 @@ public class ResourceGroupMgrTest {
}
properties.put(ResourceGroup.CPU_SHARE, "10");
+ properties.put(ResourceGroup.MEMORY_LIMIT, "30%");
CreateResourceGroupStmt createStmt = new
CreateResourceGroupStmt(false, name, properties);
resourceGroupMgr.createResourceGroup(createStmt);
Map<String, String> newProperties = Maps.newHashMap();
newProperties.put(ResourceGroup.CPU_SHARE, "5");
+ newProperties.put(ResourceGroup.MEMORY_LIMIT, "30%");
AlterResourceGroupStmt stmt2 = new AlterResourceGroupStmt(name,
newProperties);
resourceGroupMgr.alterResourceGroup(stmt2);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
index 978d28b8f1..9f174e201c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
@@ -20,7 +20,6 @@ package org.apache.doris.resource.resourcegroup;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.proc.BaseProcResult;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
@@ -34,17 +33,20 @@ public class ResourceGroupTest {
public void testCreateNormal() throws DdlException {
Map<String, String> properties1 = Maps.newHashMap();
properties1.put(ResourceGroup.CPU_SHARE, "10");
+ properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
String name1 = "g1";
ResourceGroup group1 = ResourceGroup.create(name1, properties1);
Assert.assertEquals(name1, group1.getName());
- Assert.assertEquals(1, group1.getProperties().size());
+ Assert.assertEquals(2, group1.getProperties().size());
Assert.assertTrue(group1.getProperties().containsKey(ResourceGroup.CPU_SHARE));
+ Assert.assertTrue(Math.abs(group1.getMemoryLimitPercent() - 30) <
1e-6);
}
@Test(expected = DdlException.class)
public void testNotSupportProperty() throws DdlException {
Map<String, String> properties1 = Maps.newHashMap();
properties1.put(ResourceGroup.CPU_SHARE, "10");
+ properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
properties1.put("share", "10");
String name1 = "g1";
ResourceGroup.create(name1, properties1);
@@ -61,12 +63,13 @@ public class ResourceGroupTest {
public void testCpuShareValue() {
Map<String, String> properties1 = Maps.newHashMap();
properties1.put(ResourceGroup.CPU_SHARE, "0");
+ properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
String name1 = "g1";
try {
ResourceGroup.create(name1, properties1);
Assert.fail();
} catch (DdlException e) {
- Assert.assertTrue(e.getMessage().contains(ResourceGroup.CPU_SHARE
+ " requires a positive integer."));
+ Assert.assertTrue(e.getMessage().contains("requires a positive
integer."));
}
properties1.put(ResourceGroup.CPU_SHARE, "cpu");
@@ -74,7 +77,7 @@ public class ResourceGroupTest {
ResourceGroup.create(name1, properties1);
Assert.fail();
} catch (DdlException e) {
- Assert.assertTrue(e.getMessage().contains(ResourceGroup.CPU_SHARE
+ " requires a positive integer."));
+ Assert.assertTrue(e.getMessage().contains("requires a positive
integer."));
}
}
@@ -82,19 +85,13 @@ public class ResourceGroupTest {
public void testGetProcNodeData() throws DdlException {
Map<String, String> properties1 = Maps.newHashMap();
properties1.put(ResourceGroup.CPU_SHARE, "10");
+ properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
String name1 = "g1";
ResourceGroup group1 = ResourceGroup.create(name1, properties1);
BaseProcResult result = new BaseProcResult();
group1.getProcNodeData(result);
List<List<String>> rows = result.getRows();
- Assert.assertEquals(1, rows.size());
- List<List<String>> expectedRows = Lists.newArrayList();
- expectedRows.add(Lists.newArrayList(String.valueOf(group1.getId()),
name1, ResourceGroup.CPU_SHARE, "10"));
- for (int i = 0; i < expectedRows.size(); ++i) {
- for (int j = 0; j < expectedRows.get(i).size(); ++j) {
- Assert.assertEquals(expectedRows.get(i).get(j),
rows.get(i).get(j));
- }
- }
+ Assert.assertEquals(2, rows.size());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]