This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 384b78fa4e5 [Fix]delete internal group (#46351)
384b78fa4e5 is described below
commit 384b78fa4e5dda385812ea7addc27f028fe5797a
Author: wangbo <[email protected]>
AuthorDate: Fri Jan 3 21:59:36 2025 +0800
[Fix]delete internal group (#46351)
---
be/src/runtime/exec_env_init.cpp | 4 +-
.../workload_group/workload_group_manager.cpp | 23 ---------
.../workload_group/workload_group_manager.h | 12 -----
.../doris/analysis/AlterWorkloadGroupStmt.java | 9 +---
.../doris/analysis/CreateWorkloadGroupStmt.java | 10 ++--
.../main/java/org/apache/doris/catalog/Env.java | 2 -
.../CreateInternalWorkloadGroupThread.java | 55 ----------------------
.../resource/workloadgroup/WorkloadGroup.java | 39 +--------------
.../resource/workloadgroup/WorkloadGroupMgr.java | 27 +----------
gensrc/thrift/BackendService.thrift | 3 --
.../workload_manager_p0/test_curd_wlg.groovy | 24 ----------
11 files changed, 9 insertions(+), 199 deletions(-)
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index b8cde1db108..45c229cd714 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -223,7 +223,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_pipeline_tracer_ctx =
std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_workload_group_manager = new WorkloadGroupMgr();
- _workload_group_manager->init_internal_workload_group();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -297,8 +296,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
return st;
}
_storage_engine->set_heartbeat_flags(this->heartbeat_flags());
- WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg();
- if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) {
+ if (st = _storage_engine->start_bg_threads(nullptr); !st.ok()) {
LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" <<
st;
return st;
}
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 1261f414f92..560e83c5258 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -34,25 +34,6 @@
namespace doris {
-void WorkloadGroupMgr::init_internal_workload_group() {
- WorkloadGroupPtr internal_wg = nullptr;
- {
- std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
- if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) ==
_workload_groups.end()) {
- WorkloadGroupInfo internal_wg_info {
- .id = INTERNAL_WORKLOAD_GROUP_ID,
- .name = INTERNAL_WORKLOAD_GROUP_NAME,
- .cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()};
- internal_wg = std::make_shared<WorkloadGroup>(internal_wg_info,
false);
- _workload_groups[internal_wg_info.id] = internal_wg;
- }
- }
- DCHECK(internal_wg != nullptr);
- if (internal_wg) {
- internal_wg->create_cgroup_cpu_ctl();
- }
-}
-
WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
const WorkloadGroupInfo& workload_group_info) {
{
@@ -105,10 +86,6 @@ void
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
old_wg_size = _workload_groups.size();
for (auto iter = _workload_groups.begin(); iter !=
_workload_groups.end(); iter++) {
uint64_t wg_id = iter->first;
- // internal workload group created by BE can not be dropped
- if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) {
- continue;
- }
auto workload_group_ptr = iter->second;
if (used_wg_id.find(wg_id) == used_wg_id.end()) {
workload_group_ptr->shutdown();
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 0d281031972..80bb44cce6d 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -36,18 +36,11 @@ class TaskScheduler;
class MultiCoreTaskQueue;
} // namespace pipeline
-// internal_group is used for doris internal workload, currently is mainly
compaction
-const static uint64_t INTERNAL_WORKLOAD_GROUP_ID =
- static_cast<uint64_t>(TWorkloadType::type::INTERNAL);
-const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal";
-
class WorkloadGroupMgr {
public:
WorkloadGroupMgr() = default;
~WorkloadGroupMgr() = default;
- void init_internal_workload_group();
-
WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo&
workload_group_info);
void get_related_workload_groups(const std::function<bool(const
WorkloadGroupPtr& ptr)>& pred,
@@ -69,11 +62,6 @@ public:
void get_wg_resource_usage(vectorized::Block* block);
- WorkloadGroupPtr get_internal_wg() {
- std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
- }
-
void refresh_workload_group_metrics();
private:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
index becca898b64..8218ce304a9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
@@ -62,15 +62,10 @@ public class AlterWorkloadGroupStmt extends DdlStmt {
throw new AnalysisException("Workload Group properties can't be
empty");
}
- if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
- throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can
not be create or modified ");
- }
-
String tagStr = properties.get(WorkloadGroup.TAG);
- if (!StringUtils.isEmpty(tagStr) &&
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
- ||
WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
+ if (!StringUtils.isEmpty(tagStr) &&
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName))) {
throw new AnalysisException(
- WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " +
WorkloadGroupMgr.DEFAULT_GROUP_NAME
+ WorkloadGroupMgr.DEFAULT_GROUP_NAME
+ " group can not set tag");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
index 4c0c675ea00..e3ac9ef6fca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
@@ -74,15 +74,11 @@ public class CreateWorkloadGroupStmt extends DdlStmt {
throw new AnalysisException("Workload Group properties can't be
empty");
}
- if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
- throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can
not be create or modified ");
- }
-
String tagStr = properties.get(WorkloadGroup.TAG);
- if (!StringUtils.isEmpty(tagStr) &&
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
- ||
WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
+ if (!StringUtils.isEmpty(tagStr)
+ &&
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName))) {
throw new AnalysisException(
- WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " +
WorkloadGroupMgr.DEFAULT_GROUP_NAME
+ WorkloadGroupMgr.DEFAULT_GROUP_NAME
+ " group can not set tag");
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 3e3de2e6676..70e4a0f57d5 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -246,7 +246,6 @@ import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.AdmissionControl;
import org.apache.doris.resource.Tag;
-import
org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
@@ -1758,7 +1757,6 @@ public class Env {
WorkloadSchedPolicyPublisher wpPublisher = new
WorkloadSchedPolicyPublisher(this);
topicPublisherThread.addToTopicPublisherList(wpPublisher);
topicPublisherThread.start();
- new CreateInternalWorkloadGroupThread().start();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
deleted file mode 100644
index 7c6d0e3a080..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.resource.workloadgroup;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.FeConstants;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CreateInternalWorkloadGroupThread extends Thread {
-
- private static final Logger LOG =
LogManager.getLogger(CreateInternalWorkloadGroupThread.class);
-
- public CreateInternalWorkloadGroupThread() {
- super("CreateInternalWorkloadGroupThread");
- }
-
- public void run() {
- if (!FeConstants.shouldCreateInternalWorkloadGroup) {
- return;
- }
- try {
- Env env = Env.getCurrentEnv();
- while (!env.isReady()) {
- Thread.sleep(5000);
- }
- if (!env.getWorkloadGroupMgr()
-
.isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) {
- env.getWorkloadGroupMgr().createInternalWorkloadGroup();
- LOG.info("create internal workload group succ");
- } else {
- LOG.info("internal workload group already exists.");
- }
- } catch (Throwable t) {
- LOG.warn("create internal workload group failed. ", t);
- }
- }
-
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 5f8ff7829d5..c38a9379c63 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -30,10 +30,8 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TWorkloadGroupInfo;
-import org.apache.doris.thrift.TWorkloadType;
import org.apache.doris.thrift.TopicInfo;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@@ -85,11 +83,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
public static final String REMOTE_READ_BYTES_PER_SECOND =
"remote_read_bytes_per_second";
- // it's used to define Doris's internal workload group,
- // currently it is internal, only contains compaction
- // later more type and workload may be included in the future.
- public static final String INTERNAL_TYPE = "internal_type";
-
// NOTE(wb): all property is not required, some properties default value
is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit),
enable_memory_overcommit=true
@@ -98,10 +91,7 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
-
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build();
-
- public static final ImmutableMap<String, Integer> WORKLOAD_TYPE_MAP = new
ImmutableMap.Builder<String, Integer>()
- .put(TWorkloadType.INTERNAL.toString().toLowerCase(),
TWorkloadType.INTERNAL.getValue()).build();
+
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
@@ -487,25 +477,6 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
}
}
- // internal workload group is usually created by Doris.
- // If exception happens here, it means thrift not match
WORKLOAD_TYPE_MAP.
- String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE);
- if (!StringUtils.isEmpty(interTypeId)) {
- int wid = Integer.valueOf(interTypeId);
- if (TWorkloadType.findByValue(wid) == null) {
- throw new DdlException("error internal type id: " + wid + ",
current id map:" + WORKLOAD_TYPE_MAP);
- }
- }
-
- }
-
-
- Optional<Integer> getInternalTypeId() {
- String typeIdStr = this.properties.get(INTERNAL_TYPE);
- if (StringUtils.isEmpty(typeIdStr)) {
- return Optional.empty();
- }
- return Optional.of(Integer.valueOf(typeIdStr));
}
public long getId() {
@@ -601,13 +572,7 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
public TopicInfo toTopicInfo() {
TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
-
- long wgId = this.id;
- Optional<Integer> internalTypeId = getInternalTypeId();
- if (internalTypeId.isPresent()) {
- wgId = internalTypeId.get();
- }
- tWorkloadGroupInfo.setId(wgId);
+ tWorkloadGroupInfo.setId(this.id);
tWorkloadGroupInfo.setName(name);
tWorkloadGroupInfo.setVersion(version);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 13be35ce4f0..dc99692d32e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -42,7 +42,6 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUserIdentity;
-import org.apache.doris.thrift.TWorkloadType;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
@@ -72,12 +71,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
public static final Long DEFAULT_GROUP_ID = 1L;
- public static final String INTERNAL_GROUP_NAME = "_internal";
-
- // internal_type_id could be converted to workload group id when Workload
published to BE
- // refer WorkloadGroup.toTopicInfo
- public static final Long INTERNAL_TYPE_ID =
Long.valueOf(TWorkloadType.INTERNAL.getValue());
-
public static final ImmutableList<String>
WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
@@ -374,24 +367,6 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
LOG.info("Create workload group success: {}", workloadGroup);
}
- public void createInternalWorkloadGroup() {
- Map<String, String> properties = Maps.newHashMap();
- // 100 is cgroup v2 default cpu_share value
- properties.put(WorkloadGroup.CPU_SHARE, "100");
- properties.put(WorkloadGroup.INTERNAL_TYPE,
String.valueOf(INTERNAL_TYPE_ID));
- WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(),
INTERNAL_GROUP_NAME, properties);
- writeLock();
- try {
- if (!nameToWorkloadGroup.containsKey(wg.getName())) {
- nameToWorkloadGroup.put(wg.getName(), wg);
- idToWorkloadGroup.put(wg.getId(), wg);
- Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg);
- }
- } finally {
- writeUnlock();
- }
- }
-
// NOTE: used for checking sum value of 100% for cpu_hard_limit and
memory_limit
// when create/alter workload group with same tag.
// when oldWg is null it means caller is an alter stmt.
@@ -485,7 +460,7 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws
DdlException {
String workloadGroupName = stmt.getWorkloadGroupName();
- if (DEFAULT_GROUP_NAME.equals(workloadGroupName) ||
INTERNAL_GROUP_NAME.equals(workloadGroupName)) {
+ if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
throw new DdlException("Dropping workload group " +
workloadGroupName + " is not allowed");
}
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index e2cd9f3572d..6a5e4035066 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -243,9 +243,6 @@ struct TPublishTopicResult {
1: required Status.TStatus status
}
-enum TWorkloadType {
- INTERNAL = 2
-}
service BackendService {
// Called by coord to start asynchronous execution of plan fragment in
backend.
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 03ce18e1751..70d36369424 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -169,30 +169,6 @@ suite("test_crud_wlg") {
exception "can not be greater than 100%"
}
- // test alter tag and type
- test {
- sql "alter workload group test_group properties ( 'internal_type'='13'
);"
-
- exception "internal_type can not be create or modified"
- }
-
- test {
- sql "create workload group inter_wg properties('internal_type'='123');"
- exception "internal_type can not be create or modified"
- }
-
- test {
- sql "alter workload group normal properties ('tag'='123')"
-
- exception "_internal and normal group can not set tag"
- }
-
- test {
- sql "alter workload group _internal properties ('tag'='123')"
-
- exception "_internal and normal group can not set tag"
- }
-
sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%'
);"
qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
qt_cpu_hard_limit_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from information_schema.workload_groups where name in ('normal','test_group')
order by name;"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]