This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab83a6ae22c [chore](cpuresource) remove cpu resource limit in be
(#52460)
ab83a6ae22c is described below
commit ab83a6ae22ce11156b0fca4a7a6a96e0c9470888
Author: yiguolei <[email protected]>
AuthorDate: Tue Jul 1 10:40:32 2025 +0800
[chore](cpuresource) remove cpu resource limit in be (#52460)
This feature is conflict with workload group and not used any more。
Not remove FE code in this PR because we may have another implementation
in BE and may keep the interface same in FE.
---
be/src/runtime/fragment_mgr.cpp | 14 ------
be/src/runtime/fragment_mgr.h | 3 --
be/src/runtime/query_context.cpp | 12 -----
be/src/runtime/query_context.h | 16 ------
be/src/vec/exec/scan/scanner_context.cpp | 5 --
be/src/vec/exec/scan/scanner_context.h | 2 -
be/src/vec/exec/scan/scanner_scheduler.cpp | 80 ++++++++----------------------
be/src/vec/exec/scan/scanner_scheduler.h | 6 ---
8 files changed, 21 insertions(+), 117 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a609b2f7127..01637f2804e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -739,8 +739,6 @@ Status FragmentMgr::_get_or_create_query_ctx(const
TPipelineFragmentParams& para
query_ctx->set_rsc_info = true;
}
- _set_scan_concurrency(params, query_ctx.get());
-
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
if (parent.__isset.runtime_filter_info) {
@@ -884,18 +882,6 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
return Status::OK();
}
-template <typename Param>
-void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext*
query_ctx) {
-#ifndef BE_TEST
- // If the token is set, the scan task will use limited_scan_pool in
scanner scheduler.
- // Otherwise, the scan task will use local/remote scan pool in scanner
scheduler
- if (params.query_options.__isset.resource_limit &&
- params.query_options.resource_limit.__isset.cpu_limit) {
-
query_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit,
false);
- }
-#endif
-}
-
void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
std::shared_ptr<QueryContext> query_ctx = nullptr;
{
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index e6f388f73e1..340ef1fc8a3 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -195,9 +195,6 @@ private:
std::vector<std::weak_ptr<QueryContext>> queries;
};
- template <typename Param>
- void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
-
Status _get_or_create_query_ctx(const TPipelineFragmentParams& params,
const TPipelineFragmentParamsList& parent,
QuerySource query_type,
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index becca5a6717..c9955c25420 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -217,18 +217,6 @@ QueryContext::~QueryContext() {
_report_query_profile();
}
- // Not release the the thread token in query context's dector method,
because the query
- // conext may be dectored in the thread token it self. It is very
dangerous and may core.
- // And also thread token need shutdown, it may take some time, may cause
the thread that
- // release the token hang, the thread maybe a pipeline task scheduler
thread.
- if (_thread_token) {
- Status submit_st =
ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
- DelayReleaseToken::create_shared(std::move(_thread_token)));
- if (!submit_st.ok()) {
- LOG(WARNING) << "Failed to release query context thread token,
query_id "
- << print_id(_query_id) << ", error status " <<
submit_st;
- }
- }
#ifndef BE_TEST
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled())
[[unlikely]] {
try {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index b2ec386ddab..b20366aef94 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -106,15 +106,6 @@ public:
return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
}
- void set_thread_token(int concurrency, bool is_serial) {
- _thread_token =
_exec_env->scanner_scheduler()->new_limited_scan_pool_token(
- is_serial ? ThreadPool::ExecutionMode::SERIAL
- : ThreadPool::ExecutionMode::CONCURRENT,
- concurrency);
- }
-
- ThreadPoolToken* get_token() { return _thread_token.get(); }
-
void set_ready_to_execute(Status reason);
[[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
@@ -288,13 +279,6 @@ private:
std::shared_ptr<ResourceContext> _resource_ctx;
- // A token used to submit olap scanner to the "_limited_scan_thread_pool",
- // This thread pool token is created from "_limited_scan_thread_pool" from
exec env.
- // And will be shared by all instances of this query.
- // So that we can control the max thread that a query can be used to
execute.
- // If this token is not set, the scanner will be executed in
"_scan_thread_pool" in exec env.
- std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};
-
void _init_resource_context();
void _init_query_mem_tracker();
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 15de374f222..a1618c30547 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -100,15 +100,10 @@ Status ScannerContext::init() {
print_id(_state->query_id()));
}
- thread_token = _state->get_query_ctx()->get_token();
-
if (_state->get_query_ctx()->get_scan_scheduler()) {
_should_reset_thread_name = false;
}
- _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
- thread_token == nullptr ?
"False" : "True");
-
auto scanner = _all_scanners.front().lock();
DCHECK(scanner != nullptr);
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 711cdf6d4e4..5160cb38d86 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -41,7 +41,6 @@
namespace doris {
-class ThreadPoolToken;
class RuntimeState;
class TupleDescriptor;
class WorkloadGroup;
@@ -181,7 +180,6 @@ public:
// the unique id of this context
std::string ctx_id;
TUniqueId _query_id;
- ThreadPoolToken* thread_token = nullptr;
bool _should_reset_thread_name = true;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index fe3fd5b0fe8..4911b0cecd0 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -61,18 +61,10 @@ void ScannerScheduler::stop() {
_is_closed = true;
- _limited_scan_thread_pool->shutdown();
- _limited_scan_thread_pool->wait();
-
LOG(INFO) << "ScannerScheduler stopped";
}
Status ScannerScheduler::init(ExecEnv* env) {
- RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool")
-
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
-
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
-
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
- .build(&_limited_scan_thread_pool));
_is_init = true;
return Status::OK();
}
@@ -88,15 +80,16 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
<< " maybe finished";
return Status::OK();
}
+ std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
+ if (scanner_delegate == nullptr) {
+ return Status::OK();
+ }
- if (ctx->thread_token != nullptr) {
- std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
- if (scanner_delegate == nullptr) {
- return Status::OK();
- }
-
- scanner_delegate->_scanner->start_wait_worker_timer();
- auto s = ctx->thread_token->submit_func([scanner_ref = scan_task,
ctx]() {
+ scanner_delegate->_scanner->start_wait_worker_timer();
+ TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
+ auto sumbit_task = [&]() {
+ SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
+ auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
@@ -106,55 +99,24 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
scanner_ref->set_status(status);
ctx->push_back_scan_task(scanner_ref);
}
- });
- if (!s.ok()) {
- scan_task->set_status(s);
- return s;
- }
- } else {
- std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
- if (scanner_delegate == nullptr) {
- return Status::OK();
- }
-
- scanner_delegate->_scanner->start_wait_worker_timer();
- TabletStorageType type =
scanner_delegate->_scanner->get_storage_type();
- auto sumbit_task = [&]() {
- SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
- auto work_func = [scanner_ref = scan_task, ctx]() {
- auto status = [&] {
- RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
- return Status::OK();
- }();
-
- if (!status.ok()) {
- scanner_ref->set_status(status);
- ctx->push_back_scan_task(scanner_ref);
- }
- };
- SimplifiedScanTask simple_scan_task = {work_func, ctx};
- return scan_sched->submit_scan_task(simple_scan_task);
};
-
- Status submit_status = sumbit_task();
- if (!submit_status.ok()) {
- // User will see TooManyTasks error. It looks like a more
reasonable error.
- Status scan_task_status = Status::TooManyTasks(
- "Failed to submit scanner to scanner pool reason:" +
- std::string(submit_status.msg()) + "|type:" +
std::to_string(type));
- scan_task->set_status(scan_task_status);
- return scan_task_status;
- }
+ SimplifiedScanTask simple_scan_task = {work_func, ctx};
+ return scan_sched->submit_scan_task(simple_scan_task);
+ };
+
+ Status submit_status = sumbit_task();
+ if (!submit_status.ok()) {
+ // User will see TooManyTasks error. It looks like a more reasonable
error.
+ Status scan_task_status = Status::TooManyTasks(
+ "Failed to submit scanner to scanner pool reason:" +
+ std::string(submit_status.msg()) + "|type:" +
std::to_string(type));
+ scan_task->set_status(scan_task_status);
+ return scan_task_status;
}
return Status::OK();
}
-std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
- ThreadPool::ExecutionMode mode, int max_concurrency) {
- return _limited_scan_thread_pool->new_token(mode, max_concurrency);
-}
-
void handle_reserve_memory_failure(RuntimeState* state,
std::shared_ptr<ScannerContext> ctx,
const Status& st, size_t reserve_size) {
ctx->clear_free_blocks();
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 70fa475ac81..750265c63ef 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -63,9 +63,6 @@ public:
void stop();
- std::unique_ptr<ThreadPoolToken>
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
- int
max_concurrency);
-
int remote_thread_pool_max_thread_num() const { return
_remote_thread_pool_max_thread_num; }
static int get_remote_scan_thread_num();
@@ -76,9 +73,6 @@ private:
static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task);
- // _limited_scan_thread_pool is a special pool for queries with resource
limit
- std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
-
// true is the scheduler is closed.
std::atomic_bool _is_closed = {false};
bool _is_init = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]