This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 95cdeaeb24 [fix-branch-1.2-lts](TaskWorkerPool) Fix task worker pool
and remove unnecessary copy (#23538)
95cdeaeb24 is described below
commit 95cdeaeb240dfb4766016d9b9e8c7ee954af24cd
Author: bobhan1 <[email protected]>
AuthorDate: Mon Aug 28 11:26:11 2023 +0800
[fix-branch-1.2-lts](TaskWorkerPool) Fix task worker pool and remove
unnecessary copy (#23538)
cherry-pick #19822
---
be/src/agent/agent_server.cpp | 17 +++-
be/src/agent/agent_server.h | 4 +-
be/src/agent/task_worker_pool.cpp | 202 ++++++++++++++++----------------------
be/src/agent/task_worker_pool.h | 22 +++--
4 files changed, 115 insertions(+), 130 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index d57777b6e6..f925baa416 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -67,13 +67,20 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
#define CREATE_AND_START_THREAD(type, pool_name)
#endif // BE_TEST
+#ifndef BE_TEST
+ // Both PUSH and REALTIME_PUSH type use _push_load_workers
+ _push_load_workers.reset(new PushTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS,
+
PushTaskPool::PushWokerType::LOAD_V2));
+ _push_load_workers->start();
+ _push_delete_workers.reset(new PushTaskPool(exec_env,
+
TaskWorkerPool::ThreadModel::MULTI_THREADS,
+
PushTaskPool::PushWokerType::DELETE));
+ _push_delete_workers->start();
+#endif
CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
- // Both PUSH and REALTIME_PUSH type use _push_workers
- CREATE_AND_START_POOL(PUSH, _push_workers);
CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK,
_clear_transaction_task_workers);
- CREATE_AND_START_POOL(DELETE, _delete_workers);
CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
CREATE_AND_START_POOL(CLONE, _clone_workers);
CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE,
_storage_medium_migrate_workers);
@@ -165,9 +172,9 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
}
if (task.push_req.push_type == TPushType::LOAD ||
task.push_req.push_type == TPushType::LOAD_V2) {
- _push_workers->submit_task(task);
+ _push_load_workers->submit_task(task);
} else if (task.push_req.push_type == TPushType::DELETE) {
- _delete_workers->submit_task(task);
+ _push_delete_workers->submit_task(task);
} else {
ret_st = Status::InvalidArgument(
"task(signature={}, type={}, push_type={}) has wrong
push_type", signature,
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index bfb819dd97..32ae5ad8ee 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -55,10 +55,10 @@ private:
std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
- std::unique_ptr<TaskWorkerPool> _push_workers;
+ std::unique_ptr<TaskWorkerPool> _push_load_workers;
std::unique_ptr<TaskWorkerPool> _publish_version_workers;
std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
- std::unique_ptr<TaskWorkerPool> _delete_workers;
+ std::unique_ptr<TaskWorkerPool> _push_delete_workers;
std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
std::unique_ptr<TaskWorkerPool> _clone_workers;
std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index bdccdaf7e5..c95b782d3b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -113,96 +113,92 @@ void TaskWorkerPool::start() {
if (_thread_model == ThreadModel::SINGLE_THREAD) {
_worker_count = 1;
}
- std::function<void()> cb;
switch (_task_worker_type) {
case TaskWorkerType::CREATE_TABLE:
_worker_count = config::create_tablet_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::DROP_TABLE:
_worker_count = config::drop_tablet_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::PUSH:
case TaskWorkerType::REALTIME_PUSH:
- _worker_count =
- config::push_worker_count_normal_priority +
config::push_worker_count_high_priority;
- cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback,
this);
+ break;
+ case TaskWorkerType::DELETE:
break;
case TaskWorkerType::PUBLISH_VERSION:
_worker_count = config::publish_version_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
break;
case TaskWorkerType::CLEAR_TRANSACTION_TASK:
_worker_count = config::clear_transaction_task_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
this);
- break;
- case TaskWorkerType::DELETE:
- _worker_count = config::delete_worker_count;
- cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
+ this);
break;
case TaskWorkerType::ALTER_TABLE:
_worker_count = config::alter_tablet_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::CLONE:
_worker_count = config::clone_worker_count;
- cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback,
this);
+ _cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback,
this);
break;
case TaskWorkerType::STORAGE_MEDIUM_MIGRATE:
_worker_count = config::storage_medium_migrate_count;
- cb =
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
+ this);
break;
case TaskWorkerType::CHECK_CONSISTENCY:
_worker_count = config::check_consistency_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback,
this);
break;
case TaskWorkerType::REPORT_TASK:
- cb =
std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
break;
case TaskWorkerType::REPORT_DISK_STATE:
- cb =
std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback,
this);
break;
case TaskWorkerType::REPORT_OLAP_TABLE:
- cb =
std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::UPLOAD:
_worker_count = config::upload_worker_count;
- cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback,
this);
+ _cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback,
this);
break;
case TaskWorkerType::DOWNLOAD:
_worker_count = config::download_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
break;
case TaskWorkerType::MAKE_SNAPSHOT:
_worker_count = config::make_snapshot_worker_count;
- cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback,
this);
+ _cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback,
this);
break;
case TaskWorkerType::RELEASE_SNAPSHOT:
_worker_count = config::release_snapshot_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
break;
case TaskWorkerType::MOVE:
_worker_count = 1;
- cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this);
+ _cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback,
this);
break;
case TaskWorkerType::UPDATE_TABLET_META_INFO:
_worker_count = 1;
- cb =
std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback,
this);
break;
case TaskWorkerType::SUBMIT_TABLE_COMPACTION:
_worker_count = 1;
- cb =
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
- this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
+ this);
break;
case TaskWorkerType::REFRESH_STORAGE_POLICY:
- cb = std::bind<void>(
+ _cb = std::bind<void>(
&TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback, this);
break;
case TaskWorkerType::UPDATE_STORAGE_POLICY:
_worker_count = 1;
- cb =
std::bind<void>(&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback,
- this);
+ _cb = std::bind<void>(
+
&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback, this);
break;
default:
// pass
@@ -217,7 +213,7 @@ void TaskWorkerPool::start() {
.build(&_thread_pool);
for (int i = 0; i < _worker_count; i++) {
- auto st = _thread_pool->submit_func(cb);
+ auto st = _thread_pool->submit_func(_cb);
CHECK(st.ok()) << st;
}
#endif
@@ -310,36 +306,9 @@ void TaskWorkerPool::_finish_task(const
TFinishTaskRequest& finish_task_request)
TRACE("finish task");
}
-uint32_t TaskWorkerPool::_get_next_task_index(int32_t thread_count,
- std::deque<TAgentTaskRequest>&
tasks,
- TPriority::type priority) {
- int32_t index = -1;
- std::deque<TAgentTaskRequest>::size_type task_count = tasks.size();
- for (uint32_t i = 0; i < task_count; ++i) {
- TAgentTaskRequest task = tasks[i];
- if (priority == TPriority::HIGH) {
- if (task.__isset.priority && task.priority == TPriority::HIGH) {
- index = i;
- break;
- }
- }
- }
-
- if (index == -1) {
- if (priority == TPriority::HIGH) {
- return index;
- }
-
- index = 0;
- }
-
- return index;
-}
-
void TaskWorkerPool::_create_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TCreateTabletReq create_tablet_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -349,10 +318,9 @@ void
TaskWorkerPool::_create_tablet_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- create_tablet_req = agent_task_req.create_tablet_req;
_tasks.pop_front();
}
-
+ const TCreateTabletReq& create_tablet_req =
agent_task_req.create_tablet_req;
scoped_refptr<Trace> trace(new Trace);
MonotonicStopWatch watch;
watch.start();
@@ -413,7 +381,6 @@ void
TaskWorkerPool::_create_tablet_worker_thread_callback() {
void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TDropTabletReq drop_tablet_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -423,10 +390,9 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback()
{
}
agent_task_req = _tasks.front();
- drop_tablet_req = agent_task_req.drop_tablet_req;
_tasks.pop_front();
}
-
+ const TDropTabletReq& drop_tablet_req = agent_task_req.drop_tablet_req;
Status status;
TabletSharedPtr dropped_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
drop_tablet_req.tablet_id, false);
@@ -571,12 +537,27 @@ void TaskWorkerPool::_alter_tablet(const
TAgentTaskRequest& agent_task_req, int6
finish_task_request->__set_task_status(status.to_thrift());
}
-void TaskWorkerPool::_push_worker_thread_callback() {
+PushTaskPool::PushTaskPool(ExecEnv* env, ThreadModel thread_model,
PushWokerType type)
+ : TaskWorkerPool(
+ type == PushWokerType::LOAD_V2 ? TaskWorkerType::PUSH :
TaskWorkerType::DELETE,
+ env, *env->master_info(), thread_model),
+ _push_worker_type(type) {
+ if (_push_worker_type == PushWokerType::LOAD_V2) {
+ _worker_count =
+ config::push_worker_count_normal_priority +
config::push_worker_count_high_priority;
+
+ } else {
+ _worker_count = config::delete_worker_count;
+ }
+ _cb = [this]() { _push_worker_thread_callback(); };
+}
+
+void PushTaskPool::_push_worker_thread_callback() {
// gen high priority worker thread
TPriority::type priority = TPriority::NORMAL;
int32_t push_worker_count_high_priority =
config::push_worker_count_high_priority;
- static uint32_t s_worker_count = 0;
- {
+ if (_push_worker_type == PushWokerType::LOAD_V2) {
+ static uint32_t s_worker_count = 0;
std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
if (s_worker_count < push_worker_count_high_priority) {
++s_worker_count;
@@ -586,9 +567,7 @@ void TaskWorkerPool::_push_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TPushReq push_req;
- int32_t index = 0;
- do {
+ {
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
@@ -596,26 +575,26 @@ void TaskWorkerPool::_push_worker_thread_callback() {
return;
}
- index =
_get_next_task_index(config::push_worker_count_normal_priority +
-
config::push_worker_count_high_priority,
- _tasks, priority);
-
- if (index < 0) {
- // there is no high priority task. notify other thread to
handle normal task
- _worker_thread_condition_variable.notify_all();
- break;
+ if (priority == TPriority::HIGH) {
+ const auto it = std::find_if(
+ _tasks.cbegin(), _tasks.cend(), [](const
TAgentTaskRequest& req) {
+ return req.__isset.priority && req.priority ==
TPriority::HIGH;
+ });
+
+ if (it == _tasks.cend()) {
+ // there is no high priority task. notify other thread to
handle normal task
+ _worker_thread_condition_variable.notify_all();
+ sleep(1);
+ continue;
+ }
+ agent_task_req = std::move(*it);
+ _tasks.erase(it);
+ } else {
+ agent_task_req = std::move(_tasks.front());
+ _tasks.pop_front();
}
-
- agent_task_req = _tasks[index];
- push_req = agent_task_req.push_req;
- _tasks.erase(_tasks.begin() + index);
- } while (false);
-
- if (index < 0) {
- // there is no high priority task in queue
- sleep(1);
- continue;
}
+ TPushReq& push_req = agent_task_req.push_req;
LOG(INFO) << "get push task. signature=" << agent_task_req.signature
<< ", priority=" << priority << " push_type=" <<
push_req.push_type;
@@ -658,7 +637,6 @@ void TaskWorkerPool::_push_worker_thread_callback() {
void TaskWorkerPool::_publish_version_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TPublishVersionRequest publish_version_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -668,10 +646,9 @@ void
TaskWorkerPool::_publish_version_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- publish_version_req = agent_task_req.publish_version_req;
_tasks.pop_front();
}
-
+ TPublishVersionRequest& publish_version_req =
agent_task_req.publish_version_req;
DorisMetrics::instance()->publish_task_request_total->increment(1);
VLOG_NOTICE << "get publish version task. signature=" <<
agent_task_req.signature;
@@ -769,7 +746,6 @@ void
TaskWorkerPool::_publish_version_worker_thread_callback() {
void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TClearTransactionTaskRequest clear_transaction_task_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -779,9 +755,10 @@ void
TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- clear_transaction_task_req =
agent_task_req.clear_transaction_task_req;
_tasks.pop_front();
}
+ const TClearTransactionTaskRequest& clear_transaction_task_req =
+ agent_task_req.clear_transaction_task_req;
LOG(INFO) << "get clear transaction task. signature=" <<
agent_task_req.signature
<< ", transaction_id=" <<
clear_transaction_task_req.transaction_id
<< ", partition_id_size=" <<
clear_transaction_task_req.partition_id.size();
@@ -821,7 +798,6 @@ void
TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TUpdateTabletMetaInfoReq update_tablet_meta_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -831,9 +807,10 @@ void
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- update_tablet_meta_req =
agent_task_req.update_tablet_meta_info_req;
_tasks.pop_front();
}
+ const TUpdateTabletMetaInfoReq& update_tablet_meta_req =
+ agent_task_req.update_tablet_meta_info_req;
LOG(INFO) << "get update tablet meta task. signature=" <<
agent_task_req.signature;
Status status;
@@ -895,8 +872,6 @@ void
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
void TaskWorkerPool::_clone_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TCloneReq clone_req;
-
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -906,9 +881,9 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- clone_req = agent_task_req.clone_req;
_tasks.pop_front();
}
+ const TCloneReq& clone_req = agent_task_req.clone_req;
DorisMetrics::instance()->clone_requests_total->increment(1);
LOG(INFO) << "get clone task. signature=" << agent_task_req.signature;
@@ -945,7 +920,6 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TStorageMediumMigrateReq storage_medium_migrate_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -955,9 +929,10 @@ void
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- storage_medium_migrate_req =
agent_task_req.storage_medium_migrate_req;
_tasks.pop_front();
}
+ const TStorageMediumMigrateReq& storage_medium_migrate_req =
+ agent_task_req.storage_medium_migrate_req;
// check request and get info
TabletSharedPtr tablet;
@@ -1046,7 +1021,6 @@ Status TaskWorkerPool::_check_migrate_request(const
TStorageMediumMigrateReq& re
void TaskWorkerPool::_check_consistency_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TCheckConsistencyReq check_consistency_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1056,9 +1030,9 @@ void
TaskWorkerPool::_check_consistency_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- check_consistency_req = agent_task_req.check_consistency_req;
_tasks.pop_front();
}
+ const TCheckConsistencyReq& check_consistency_req =
agent_task_req.check_consistency_req;
uint32_t checksum = 0;
EngineChecksumTask engine_task(check_consistency_req.tablet_id,
@@ -1239,7 +1213,6 @@ void
TaskWorkerPool::_report_tablet_worker_thread_callback() {
void TaskWorkerPool::_upload_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TUploadReq upload_request;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1249,9 +1222,9 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- upload_request = agent_task_req.upload_req;
_tasks.pop_front();
}
+ const TUploadReq& upload_request = agent_task_req.upload_req;
LOG(INFO) << "get upload task. signature=" << agent_task_req.signature
<< ", job_id=" << upload_request.job_id;
@@ -1290,7 +1263,6 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
void TaskWorkerPool::_download_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TDownloadReq download_request;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1300,9 +1272,9 @@ void TaskWorkerPool::_download_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- download_request = agent_task_req.download_req;
_tasks.pop_front();
}
+ const TDownloadReq& download_request = agent_task_req.download_req;
LOG(INFO) << "get download task. signature=" <<
agent_task_req.signature
<< ", job_id=" << download_request.job_id;
@@ -1342,7 +1314,6 @@ void TaskWorkerPool::_download_worker_thread_callback() {
void TaskWorkerPool::_make_snapshot_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TSnapshotRequest snapshot_request;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1352,9 +1323,9 @@ void TaskWorkerPool::_make_snapshot_thread_callback() {
}
agent_task_req = _tasks.front();
- snapshot_request = agent_task_req.snapshot_req;
_tasks.pop_front();
}
+ const TSnapshotRequest& snapshot_request = agent_task_req.snapshot_req;
LOG(INFO) << "get snapshot task. signature=" <<
agent_task_req.signature;
string snapshot_path;
@@ -1401,7 +1372,6 @@ void TaskWorkerPool::_make_snapshot_thread_callback() {
void TaskWorkerPool::_release_snapshot_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TReleaseSnapshotRequest release_snapshot_request;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1411,12 +1381,13 @@ void
TaskWorkerPool::_release_snapshot_thread_callback() {
}
agent_task_req = _tasks.front();
- release_snapshot_request = agent_task_req.release_snapshot_req;
_tasks.pop_front();
}
+ const TReleaseSnapshotRequest& release_snapshot_request =
+ agent_task_req.release_snapshot_req;
LOG(INFO) << "get release snapshot task. signature=" <<
agent_task_req.signature;
- string& snapshot_path = release_snapshot_request.snapshot_path;
+ const string& snapshot_path = release_snapshot_request.snapshot_path;
Status status =
SnapshotManager::instance()->release_snapshot(snapshot_path);
if (!status.ok()) {
LOG_WARNING("failed to release snapshot")
@@ -1450,7 +1421,6 @@ Status TaskWorkerPool::_get_tablet_info(const TTabletId
tablet_id, const TSchema
void TaskWorkerPool::_move_dir_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TMoveDirReq move_dir_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1460,9 +1430,9 @@ void TaskWorkerPool::_move_dir_thread_callback() {
}
agent_task_req = _tasks.front();
- move_dir_req = agent_task_req.move_dir_req;
_tasks.pop_front();
}
+ const TMoveDirReq& move_dir_req = agent_task_req.move_dir_req;
LOG(INFO) << "get move dir task. signature=" <<
agent_task_req.signature
<< ", job_id=" << move_dir_req.job_id;
Status status = _move_dir(move_dir_req.tablet_id, move_dir_req.src,
move_dir_req.job_id,
@@ -1556,8 +1526,6 @@ void TaskWorkerPool::_random_sleep(int second) {
void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TCompactionReq compaction_req;
-
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1567,9 +1535,9 @@ void
TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- compaction_req = agent_task_req.compaction_req;
_tasks.pop_front();
}
+ const TCompactionReq& compaction_req = agent_task_req.compaction_req;
LOG(INFO) << "get compaction task. signature=" <<
agent_task_req.signature
<< ", compaction_type=" << compaction_req.type;
@@ -1658,7 +1626,6 @@ void
TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback() {
void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TGetStoragePolicy get_storage_policy_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1668,9 +1635,10 @@ void
TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- get_storage_policy_req = agent_task_req.update_policy;
+
_tasks.pop_front();
}
+ const TGetStoragePolicy& get_storage_policy_req =
agent_task_req.update_policy;
StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr();
auto policy_ptr = std::make_shared<StoragePolicy>();
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index ae8b9e8149..f81b5c0b31 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -164,16 +164,12 @@ public:
// notify the worker. currently for task/disk/tablet report thread
void notify_thread();
-private:
+protected:
bool _register_task_info(const TTaskType::type task_type, int64_t
signature);
void _remove_task_info(const TTaskType::type task_type, int64_t signature);
void _finish_task(const TFinishTaskRequest& finish_task_request);
- uint32_t _get_next_task_index(int32_t thread_count,
std::deque<TAgentTaskRequest>& tasks,
- TPriority::type priority);
-
void _create_tablet_worker_thread_callback();
void _drop_tablet_worker_thread_callback();
- void _push_worker_thread_callback();
void _publish_version_worker_thread_callback();
void _clear_transaction_task_worker_thread_callback();
void _alter_tablet_worker_thread_callback();
@@ -209,7 +205,7 @@ private:
// random sleep 1~second seconds
void _random_sleep(int second);
-private:
+protected:
std::string _name;
// Reference to the ExecEnv::_master_info
@@ -237,6 +233,7 @@ private:
// Always 1 when _thread_model is SINGLE_THREAD
uint32_t _worker_count;
TaskWorkerType _task_worker_type;
+ std::function<void()> _cb;
static FrontendServiceClientCache _master_service_client_cache;
static std::atomic_ulong _s_report_version;
@@ -246,4 +243,17 @@ private:
DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
}; // class TaskWorkerPool
+
+class PushTaskPool : public TaskWorkerPool {
+public:
+ enum class PushWokerType { LOAD_V2, DELETE };
+ PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type);
+ void _push_worker_thread_callback();
+
+ DISALLOW_COPY_AND_ASSIGN(PushTaskPool);
+
+private:
+ PushWokerType _push_worker_type;
+};
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]