This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 550ed2b6e75 [Fix](executor)Fix when Fe send empty wg list to be may
cause query failed. (#34074)
550ed2b6e75 is described below
commit 550ed2b6e75a6c4b698f5f6b1fa9027a218e2cdb
Author: wangbo <[email protected]>
AuthorDate: Wed Apr 24 23:36:17 2024 +0800
[Fix](executor)Fix when Fe send empty wg list to be may cause query failed.
(#34074)
---
be/src/agent/topic_subscriber.cpp | 6 +-
be/src/agent/workload_group_listener.cpp | 27 +++++++--
be/src/runtime/workload_group/workload_group.cpp | 4 +-
be/src/runtime/workload_group/workload_group.h | 7 ++-
.../workload_group/workload_group_manager.cpp | 67 ++++++++++++----------
.../main/java/org/apache/doris/catalog/Env.java | 22 +++----
.../doris/common/publish/TopicPublisherThread.java | 24 ++++++--
.../common/publish/WorkloadGroupPublisher.java | 14 ++++-
8 files changed, 114 insertions(+), 57 deletions(-)
diff --git a/be/src/agent/topic_subscriber.cpp
b/be/src/agent/topic_subscriber.cpp
index 7f7cffd8840..f62bdaef099 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -40,12 +40,14 @@ void TopicSubscriber::handle_topic_info(const
TPublishTopicRequest& topic_reques
// eg, update workload info may delay other listener, then we need add a
thread here
// to handle_topic_info asynchronous
std::shared_lock lock(_listener_mtx);
- LOG(INFO) << "begin handle topic info";
+ LOG(INFO) << "[topic_publish]begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
if (topic_request.topic_map.find(listener_pair.first) !=
topic_request.topic_map.end()) {
+ LOG(INFO) << "[topic_publish]begin handle topic " <<
listener_pair.first
+ << ", size=" <<
topic_request.topic_map.at(listener_pair.first).size();
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
- LOG(INFO) << "handle topic " << listener_pair.first << "
successfully";
+ LOG(INFO) << "[topic_publish]finish handle topic " <<
listener_pair.first;
}
}
}
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index f98315fa433..822e3c692f7 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -26,21 +26,27 @@ namespace doris {
void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>&
topic_info_list) {
std::set<uint64_t> current_wg_ids;
+ bool is_set_workload_group_info = false;
+ int list_size = topic_info_list.size();
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_group_info) {
continue;
}
+ is_set_workload_group_info = true;
// 1 parse topicinfo to group info
WorkloadGroupInfo workload_group_info;
Status ret =
WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info,
&workload_group_info);
+ // it means FE has this wg, but may parse failed, so we should not
delete it.
+ if (workload_group_info.id != 0) {
+ current_wg_ids.insert(workload_group_info.id);
+ }
if (!ret.ok()) {
- LOG(INFO) << "parse topic info failed, tg_id=" <<
workload_group_info.id
- << ", reason:" << ret.to_string();
+ LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id="
+ << workload_group_info.id << ", reason:" <<
ret.to_string();
continue;
}
- current_wg_ids.insert(workload_group_info.id);
// 2 update workload group
auto tg =
@@ -53,16 +59,25 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
// 4 create and update task scheduler
tg->upsert_task_scheduler(&workload_group_info, _exec_env);
- LOG(INFO) << "update workload group finish, tg info=" <<
tg->debug_string()
- << ", enable_cpu_hard_limit="
+ LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info="
+ << tg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit()
? "true" : "false")
<< ", cgroup cpu_shares=" <<
workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" <<
workload_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
- << ", cgroup home path=" << config::doris_cgroup_cpu_path;
+ << ", cgroup home path=" << config::doris_cgroup_cpu_path
+ << ", list size=" << list_size;
}
+ // NOTE(wb) when is_set_workload_group_info=false, it means FE send a
empty workload group list
+ // this should not happens, because FE should has at least one normal
group.
+ // just log it if that happens
+ if (!is_set_workload_group_info) {
+ LOG(INFO) << "[topic_publish_wg]unexpected error happens, no workload
group info is "
+ "set, list size="
+ << list_size;
+ }
_exec_env->workload_group_mgr()->delete_workload_group_by_ids(current_wg_ids);
}
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 1b15e89b08e..673263f1a17 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -68,11 +68,11 @@ std::string WorkloadGroup::debug_string() const {
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {},
enable_memory_overcommit = "
"{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
"{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num =
{}, "
- "spill_low_watermark={}, spill_high_watermark={}]",
+ "spill_low_watermark={}, spill_high_watermark={}, is_shutdown={},
query_num={}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit,
TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version,
cpu_hard_limit(),
_scan_thread_num, _max_remote_scan_thread_num,
_min_remote_scan_thread_num,
- _spill_low_watermark, _spill_high_watermark);
+ _spill_low_watermark, _spill_high_watermark, _is_shutdown,
_query_ctxs.size());
}
void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 49bcd841a0f..d4ef689766a 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -119,7 +119,7 @@ public:
// If the workload group is set shutdown, then should not run any
more,
// because the scheduler pool and other pointer may be released.
return Status::InternalError(
- "Failed add query to workload group, the workload group is
shutdown. host: {}",
+ "Failed add query to wg {}, the workload group is
shutdown. host: {}", _id,
BackendOptions::get_localhost());
}
_query_ctxs.insert({query_id, query_ctx});
@@ -136,6 +136,11 @@ public:
_is_shutdown = true;
}
+ bool can_be_dropped() {
+ std::shared_lock<std::shared_mutex> r_lock(_mutex);
+ return _is_shutdown && _query_ctxs.size() == 0;
+ }
+
int query_num() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _query_ctxs.size();
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index e336c9f80a8..a0e0de75f36 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -76,35 +76,40 @@ void
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
int64_t begin_time = MonotonicMillis();
// 1 get delete group without running queries
std::vector<WorkloadGroupPtr> deleted_task_groups;
+ int old_wg_size = 0;
+ int new_wg_size = 0;
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+ old_wg_size = _workload_groups.size();
for (auto iter = _workload_groups.begin(); iter !=
_workload_groups.end(); iter++) {
- uint64_t tg_id = iter->first;
+ uint64_t wg_id = iter->first;
auto workload_group_ptr = iter->second;
- if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+ if (used_wg_id.find(wg_id) == used_wg_id.end()) {
workload_group_ptr->shutdown();
- // only when no query running in workload group, its resource
can be released in BE
- if (workload_group_ptr->query_num() == 0) {
- LOG(INFO) << "There is no query in wg " << tg_id << ",
delete it.";
- deleted_task_groups.push_back(workload_group_ptr);
- }
+ LOG(INFO) << "[topic_publish_wg] shutdown wg:" << wg_id;
+ }
+ // wg is shutdown and running rum = 0, its resource can be
released in BE
+ if (workload_group_ptr->can_be_dropped()) {
+ LOG(INFO) << "[topic_publish_wg]There is no query in wg" <<
wg_id << ", delete it.";
+ deleted_task_groups.push_back(workload_group_ptr);
}
}
}
// 2 stop active thread
- for (auto& tg : deleted_task_groups) {
+ for (auto& wg : deleted_task_groups) {
// There is not lock here, but the tg may be released by another
- // thread, so that we should use shared ptr here, not use tg_id
- tg->try_stop_schedulers();
+ // thread, so that we should use shared ptr here, not use wg_id
+ wg->try_stop_schedulers();
}
// 3 release resource in memory
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
- for (auto& tg : deleted_task_groups) {
- _workload_groups.erase(tg->id());
+ for (auto& wg : deleted_task_groups) {
+ _workload_groups.erase(wg->id());
}
+ new_wg_size = _workload_groups.size();
}
// 4 clear cgroup dir
@@ -113,28 +118,32 @@ void
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
// So the first time to rmdir a cgroup path may failed.
// Using cgdelete has no such issue.
{
- std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
- if (!_cg_cpu_ctl) {
- _cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
- }
- if (!_is_init_succ) {
- Status ret = _cg_cpu_ctl->init();
- if (ret.ok()) {
- _is_init_succ = true;
- } else {
- LOG(INFO) << "init workload group mgr cpu ctl failed, " <<
ret.to_string();
+ if (config::doris_cgroup_cpu_path != "") {
+ std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
+ if (!_cg_cpu_ctl) {
+ _cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
}
- }
- if (_is_init_succ) {
- Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
- if (!ret.ok()) {
- LOG(WARNING) << ret.to_string();
+ if (!_is_init_succ) {
+ Status ret = _cg_cpu_ctl->init();
+ if (ret.ok()) {
+ _is_init_succ = true;
+ } else {
+ LOG(INFO) << "[topic_publish_wg]init workload group mgr
cpu ctl failed, "
+ << ret.to_string();
+ }
+ }
+ if (_is_init_succ) {
+ Status ret =
_cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
+ if (!ret.ok()) {
+ LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
+ }
}
}
}
int64_t time_cost_ms = MonotonicMillis() - begin_time;
- LOG(INFO) << "finish clear unused workload group, time cost: " <<
time_cost_ms
- << "ms, deleted group size:" << deleted_task_groups.size();
+ LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time
cost: " << time_cost_ms
+ << "ms, deleted group size:" << deleted_task_groups.size()
+ << ", before wg size=" << old_wg_size << ", after wg size=" <<
new_wg_size;
}
struct WorkloadGroupMemInfo {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 90d55931af5..0514f22aed3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1066,16 +1066,6 @@ public class Env {
}
queryCancelWorker.start();
-
- TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
- topicPublisherThread.addToTopicPublisherList(wgPublisher);
- WorkloadSchedPolicyPublisher wpPublisher = new
WorkloadSchedPolicyPublisher(this);
- topicPublisherThread.addToTopicPublisherList(wpPublisher);
- topicPublisherThread.start();
-
- workloadGroupMgr.startUpdateThread();
- workloadSchedPolicyMgr.start();
- workloadRuntimeStatusMgr.start();
}
// wait until FE is ready.
@@ -1718,6 +1708,13 @@ public class Env {
binlogGcer.start();
columnIdFlusher.start();
insertOverwriteManager.start();
+
+ TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
+ topicPublisherThread.addToTopicPublisherList(wgPublisher);
+ WorkloadSchedPolicyPublisher wpPublisher = new
WorkloadSchedPolicyPublisher(this);
+ topicPublisherThread.addToTopicPublisherList(wpPublisher);
+ topicPublisherThread.start();
+
}
// start threads that should run on all FE
@@ -1739,6 +1736,11 @@ public class Env {
}
dnsCache.start();
+
+ workloadGroupMgr.startUpdateThread();
+ workloadSchedPolicyMgr.start();
+ workloadRuntimeStatusMgr.start();
+
}
private void transferToNonMaster(FrontendNodeType newType) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
index 9af6bd392cb..dde45e44e29 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -26,6 +26,8 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
+import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -34,6 +36,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
public class TopicPublisherThread extends MasterDaemon {
@@ -59,7 +62,7 @@ public class TopicPublisherThread extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- LOG.info("begin publish topic info");
+ LOG.info("[topic_publish]begin publish topic info");
// step 1: get all publish topic info
TPublishTopicRequest request = new TPublishTopicRequest();
for (TopicPublisher topicPublisher : topicPublisherList) {
@@ -106,16 +109,24 @@ public class TopicPublisherThread extends MasterDaemon {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
+ String logStr = "";
+ try {
+ for (Map.Entry<TTopicInfoType, List<TopicInfo>> entry :
request.getTopicMap().entrySet()) {
+ logStr += " " + entry.getKey() + "=" +
entry.getValue().size() + " ";
+ }
+ } catch (Exception e) {
+ LOG.warn("[topic_publish]make log detail for publish failed:",
e);
+ }
try {
address = new TNetworkAddress(be.getHost(), be.getBePort());
client = ClientPool.backendPool.borrowObject(address);
client.publishTopicInfo(request);
ok = true;
- LOG.info("publish topic info to be {} success, time cost={}
ms",
- be.getHost(), (System.currentTimeMillis() -
beginTime));
+ LOG.info("[topic_publish]publish topic info to be {} success,
time cost={} ms, details:{}",
+ be.getHost(), (System.currentTimeMillis() -
beginTime), logStr);
} catch (Exception e) {
- LOG.warn("publish topic info to be {} error happens: , time
cost={} ms",
- be.getHost(), (System.currentTimeMillis() -
beginTime), e);
+ LOG.warn("[topic_publish]publish topic info to be {} error
happens: , time cost={} ms, details:{}",
+ be.getHost(), (System.currentTimeMillis() -
beginTime), logStr, e);
} finally {
try {
if (ok) {
@@ -124,7 +135,8 @@ public class TopicPublisherThread extends MasterDaemon {
ClientPool.backendPool.invalidateObject(address,
client);
}
} catch (Throwable e) {
- LOG.warn("recycle topic publish client failed. related
backend[{}]", be.getHost(), e);
+ LOG.warn("[topic_publish]recycle topic publish client
failed. related backend[{}]", be.getHost(),
+ e);
}
handler.onResponse(be);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
index 45b3664631e..ea8ac9256e4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
@@ -18,14 +18,20 @@
package org.apache.doris.common.publish;
import org.apache.doris.catalog.Env;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TopicInfo;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.List;
public class WorkloadGroupPublisher implements TopicPublisher {
+ private static final Logger LOG =
LogManager.getLogger(WorkloadGroupPublisher.class);
+
private Env env;
public WorkloadGroupPublisher(Env env) {
@@ -35,6 +41,12 @@ public class WorkloadGroupPublisher implements
TopicPublisher {
@Override
public void getTopicInfo(TPublishTopicRequest req) {
List<TopicInfo> list = env.getWorkloadGroupMgr().getPublishTopicInfo();
- req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
+ if (list.size() == 0) {
+ LOG.warn("[topic_publish]currently, doris at least has one
workload group named "
+ + WorkloadGroupMgr.DEFAULT_GROUP_NAME
+ + ", so get a size 0 here is an error, should check it.");
+ } else {
+ req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]