This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c8c3cb8dddf [Refactor]refactor workload group cpu hard limit (#51278)
c8c3cb8dddf is described below
commit c8c3cb8dddf32ac32e659dce3de937febd654c25
Author: wangbo <[email protected]>
AuthorDate: Thu May 29 14:08:14 2025 +0800
[Refactor]refactor workload group cpu hard limit (#51278)
---
be/src/agent/workload_group_listener.cpp | 26 ++++++++----------
be/src/runtime/workload_group/workload_group.cpp | 28 +++----------------
be/src/runtime/workload_group/workload_group.h | 1 -
.../workload_group/workload_group_manager.cpp | 8 +++---
.../workload_group/workload_group_manager.h | 6 -----
.../resource/workloadgroup/WorkloadGroup.java | 12 ---------
.../resource/workloadgroup/WorkloadGroupMgr.java | 30 ++++++++++-----------
.../workloadgroup/WorkloadGroupMgrTest.java | 2 +-
.../workload_manager_p0/test_curd_wlg.groovy | 31 ----------------------
9 files changed, 34 insertions(+), 110 deletions(-)
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index 0ef32d1ec4e..303726a71d1 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -35,8 +35,7 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
if (!topic_info.__isset.workload_group_info) {
continue;
}
- VLOG_DEBUG << "Received publish workload group info request: "
- << apache::thrift::ThriftDebugString(topic_info).c_str();
+
is_set_workload_group_info = true;
// 1 parse topic info to group info
@@ -59,23 +58,20 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
auto wg =
_exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info);
- // 3 set cpu soft hard limit switch
- _exec_env->workload_group_mgr()->_enable_cpu_hard_limit.store(
- workload_group_info.enable_cpu_hard_limit);
-
- // 4 create and update task scheduler
+ // 3 create and update task scheduler
wg->upsert_task_scheduler(&workload_group_info);
- // 5 upsert io throttle
+ // 4 upsert io throttle
wg->upsert_scan_io_throttle(&workload_group_info);
- VLOG_DEBUG << "[topic_publish_wg]update workload group finish, wg
info="
- << wg->debug_string() << ", enable_cpu_hard_limit="
- <<
(_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
- << ", cgroup cpu_shares=" <<
workload_group_info.cgroup_cpu_shares
- << ", cgroup cpu_hard_limit=" <<
workload_group_info.cgroup_cpu_hard_limit
- << ", cgroup home path=" << config::doris_cgroup_cpu_path
- << ", list size=" << list_size << ", thread info=" <<
wg->thread_debug_info();
+ LOG_EVERY_T(INFO, 180) << "[topic_publish_wg]update workload group
finish, wg info="
+ << wg->debug_string()
+ << ", cgroup cpu_shares=" <<
workload_group_info.cgroup_cpu_shares
+ << ", cgroup cpu_hard_limit="
+ << workload_group_info.cgroup_cpu_hard_limit
+ << ", cgroup home path=" <<
config::doris_cgroup_cpu_path
+ << ", list size=" << list_size
+ << ", thread info=" << wg->thread_debug_info();
}
// NOTE(wb) when is_set_workload_group_info=false, it means FE send a
empty workload group list
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 83690a79eb0..6296a2dbd5b 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -375,12 +375,6 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
enable_memory_overcommit =
tworkload_group_info.enable_memory_overcommit;
}
- // 8 cpu soft limit or hard limit
- bool enable_cpu_hard_limit = false;
- if (tworkload_group_info.__isset.enable_cpu_hard_limit) {
- enable_cpu_hard_limit = tworkload_group_info.enable_cpu_hard_limit;
- }
-
// 9 scan thread num
int scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (tworkload_group_info.__isset.scan_thread_num &&
tworkload_group_info.scan_thread_num > 0) {
@@ -452,7 +446,6 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.enable_memory_overcommit = enable_memory_overcommit,
.version = version,
.cpu_hard_limit = cpu_hard_limit,
- .enable_cpu_hard_limit = enable_cpu_hard_limit,
.scan_thread_num = scan_thread_num,
.max_remote_scan_thread_num = max_remote_scan_thread_num,
.min_remote_scan_thread_num = min_remote_scan_thread_num,
@@ -589,28 +582,13 @@ void
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
}
void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) {
- uint64_t wg_id = wg_info->id;
int cpu_hard_limit = wg_info->cpu_hard_limit;
- uint64_t cpu_shares = wg_info->cpu_share;
- bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit;
+ uint64_t cpu_share = wg_info->cpu_share;
create_cgroup_cpu_ctl_no_lock();
if (_cgroup_cpu_ctl) {
- if (enable_cpu_hard_limit) {
- if (cpu_hard_limit > 0) {
- _cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
- _cgroup_cpu_ctl->update_cpu_soft_limit(
- CgroupCpuCtl::cpu_soft_limit_default_value());
- } else {
- LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is "
- "illegal: "
- << cpu_hard_limit << ", gid=" << wg_id;
- }
- } else {
- _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
- _cgroup_cpu_ctl->update_cpu_hard_limit(
- CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
- }
+ _cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
+ _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_share);
_cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares),
&(wg_info->cgroup_cpu_hard_limit));
}
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 3d61b463810..aa098b67c4b 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -302,7 +302,6 @@ struct WorkloadGroupInfo {
const bool enable_memory_overcommit = false;
const int64_t version = 0;
const int cpu_hard_limit = 0;
- const bool enable_cpu_hard_limit = false;
const int scan_thread_num = 0;
const int max_remote_scan_thread_num = 0;
const int min_remote_scan_thread_num = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 5b47535e464..1228423eb80 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -147,9 +147,11 @@ void
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
}
}
int64_t time_cost_ms = MonotonicMillis() - begin_time;
- LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time
cost: " << time_cost_ms
- << " ms, deleted group size:" << deleted_task_groups.size()
- << ", before wg size=" << old_wg_size << ", after wg size=" <<
new_wg_size;
+ if (deleted_task_groups.size() > 0) {
+ LOG(INFO) << "[topic_publish_wg]finish clear unused workload group,
time cost: "
+ << time_cost_ms << " ms, deleted group size:" <<
deleted_task_groups.size()
+ << ", before wg size=" << old_wg_size << ", after wg size="
<< new_wg_size;
+ }
}
void WorkloadGroupMgr::do_sweep() {
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index b61f0c11bf0..e1abbe57c22 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -81,12 +81,6 @@ public:
void stop();
- std::atomic<bool> _enable_cpu_hard_limit = false;
-
- bool enable_cpu_soft_limit() const { return
!_enable_cpu_hard_limit.load(); }
-
- bool enable_cpu_hard_limit() const { return _enable_cpu_hard_limit.load();
}
-
void refresh_wg_weighted_memory_limit();
void get_wg_resource_usage(vectorized::Block* block);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index e7d2c271930..134edb91f2f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -606,10 +606,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
result.addRow(row);
}
- public int getCpuHardLimitWhenCalSum() {
- return cpuHardLimit == -1 ? 0 : cpuHardLimit;
- }
-
public double getMemoryLimitPercentWhenCalSum() {
return memoryLimitPercent == -1 ? 0 : memoryLimitPercent;
}
@@ -677,14 +673,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
if (memOvercommitStr != null) {
tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
}
- // enable_cpu_hard_limit = true, using cpu hard limit
- // enable_cpu_hard_limit = false, using cpu soft limit
- tWorkloadGroupInfo.setEnableCpuHardLimit(Config.enable_cpu_hard_limit);
-
- if (Config.enable_cpu_hard_limit && cpuHardLimit <= 0) {
- LOG.warn("enable_cpu_hard_limit=true but cpuHardLimit value not
illegal,"
- + "id=" + id + ",name=" + name);
- }
String scanThreadNumStr = properties.get(SCAN_THREAD_NUM);
if (scanThreadNumStr != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index fe0ad3acbcb..3c2bda93366 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -289,10 +289,9 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
public void createWorkloadGroup(String computeGroup, WorkloadGroup
workloadGroup, boolean isIfNotExists)
throws DdlException {
- String workloadGroupName = workloadGroup.getName();
+ WorkloadGroupKey wgKey = WorkloadGroupKey.get(computeGroup,
workloadGroup.getName());
writeLock();
try {
- WorkloadGroupKey wgKey = WorkloadGroupKey.get(computeGroup,
workloadGroupName);
if (keyToWorkloadGroup.containsKey(wgKey)) {
if (isIfNotExists) {
return;
@@ -301,10 +300,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
"Compute group " + wgKey.getComputeGroup() + " already
has workload group "
+ wgKey.getWorkloadGroupName() + ".");
}
- if (idToWorkloadGroup.size() >= Config.workload_group_max_num) {
- throw new DdlException(
- "Workload group number can not be exceed " +
Config.workload_group_max_num);
- }
checkGlobalUnlock(workloadGroup, null);
keyToWorkloadGroup.put(wgKey, workloadGroup);
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
@@ -321,19 +316,25 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
// NOTE: used for checking sum value of 100% for cpu_hard_limit and
memory_limit
// when create/alter workload group with same tag.
- // when oldWg is null it means caller is an alter stmt.
+ // when oldWg is not null it means caller is an alter stmt.
private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg)
throws DdlException {
String newWgCg = newWg.getComputeGroup();
double sumOfAllMemLimit = 0;
- int sumOfAllCpuHardLimit = 0;
+ int wgNumOfCurrentCg = 0;
+ boolean isAlterStmt = oldWg != null;
+ boolean isCreateStmt = !isAlterStmt;
// 1 get sum value of all wg which has same tag without current wg
for (Map.Entry<Long, WorkloadGroup> entry :
idToWorkloadGroup.entrySet()) {
WorkloadGroup wg = entry.getValue();
String curWgCg = wg.getComputeGroup();
- if (oldWg != null && entry.getKey() == oldWg.getId()) {
+ if (newWgCg.equals(entry.getValue().getComputeGroup())) {
+ wgNumOfCurrentCg++;
+ }
+
+ if (isAlterStmt && entry.getKey() == oldWg.getId()) {
continue;
}
@@ -341,9 +342,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
continue;
}
- if (wg.getCpuHardLimitWhenCalSum() > 0) {
- sumOfAllCpuHardLimit += wg.getCpuHardLimitWhenCalSum();
- }
if (wg.getMemoryLimitPercentWhenCalSum() > 0) {
sumOfAllMemLimit += wg.getMemoryLimitPercentWhenCalSum();
}
@@ -351,7 +349,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
// 2 sum current wg value
sumOfAllMemLimit += newWg.getMemoryLimitPercentWhenCalSum();
- sumOfAllCpuHardLimit += newWg.getCpuHardLimitWhenCalSum();
// 3 check total sum
if (sumOfAllMemLimit > 100.0 + 1e-6) {
@@ -361,10 +358,11 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
+ " can not be greater than 100.0%. current sum
val:" + sumOfAllMemLimit);
}
- if (sumOfAllCpuHardLimit > 100) {
+ // 4 check wg num
+ if (isCreateStmt && wgNumOfCurrentCg >= Config.workload_group_max_num)
{
throw new DdlException(
- "The sum of all workload group " +
WorkloadGroup.CPU_HARD_LIMIT + " within compute group " + newWgCg
- + " can not be greater than 100%. current sum
val:" + sumOfAllCpuHardLimit);
+ "Workload group number in Compute Group " + newWgCg + "can
not exceed "
+ + Config.workload_group_max_num);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index b47c1b801f4..dd543b16127 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -421,7 +421,7 @@ public class WorkloadGroupMgrTest {
@Test
public void testMultiTagCreateWorkloadGroup() throws UserException {
Config.enable_workload_group = true;
- String[] props = {WorkloadGroup.MEMORY_LIMIT,
WorkloadGroup.CPU_HARD_LIMIT};
+ String[] props = {WorkloadGroup.MEMORY_LIMIT};
for (String propName : props) {
WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 3ba731d9612..0f2cecc5690 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -185,12 +185,6 @@ suite("test_crud_wlg") {
sql "alter workload group test_group $forComputeGroupStr properties (
'cpu_hard_limit'='99%' );"
- test {
- sql "alter workload group normal $forComputeGroupStr properties (
'cpu_hard_limit'='2%' );"
-
- exception "can not be greater than 100%"
- }
-
sql "alter workload group test_group $forComputeGroupStr properties (
'cpu_hard_limit'='20%' );"
qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
qt_cpu_hard_limit_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from information_schema.workload_groups where name in ('normal','test_group')
order by name;"
@@ -290,31 +284,6 @@ suite("test_crud_wlg") {
exception "must be true or false"
}
- // failed for cpu_hard_limit
- test {
- sql "create workload group if not exists test_group2
$forComputeGroupStr " +
- "properties ( " +
- " 'cpu_share'='10', " +
- " 'memory_limit'='3%', " +
- " 'enable_memory_overcommit'='true', " +
- " 'cpu_hard_limit'='120%' " +
- ");"
-
- exception "a positive integer between 1 and 100"
- }
-
- test {
- sql "create workload group if not exists test_group2
$forComputeGroupStr " +
- "properties ( " +
- " 'cpu_share'='10', " +
- " 'memory_limit'='3%', " +
- " 'enable_memory_overcommit'='true', " +
- " 'cpu_hard_limit'='99%' " +
- ");"
-
- exception "can not be greater than 100%"
- }
-
// test show workload groups
qt_select_tvf_1 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from information_schema.workload_groups where name in ('normal','test_group')
order by name;"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]