This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 77f187d9b47 [Refactor](exec) refactor the pipline code (#34402)
77f187d9b47 is described below
commit 77f187d9b474042d8286aa951dc68117de8b10dd
Author: HappenLee <[email protected]>
AuthorDate: Wed May 8 14:01:49 2024 +0800
[Refactor](exec) refactor the pipline code (#34402)
---
be/src/common/status.h | 5 +----
be/src/pipeline/exec/operator.cpp | 6 +++--
be/src/pipeline/exec/operator.h | 10 ++++-----
be/src/pipeline/pipeline_task.cpp | 19 +++++++---------
be/src/pipeline/pipeline_task.h | 12 +++++-----
be/src/pipeline/pipeline_tracing.h | 2 +-
be/src/pipeline/task_queue.cpp | 9 ++++----
be/src/pipeline/task_queue.h | 1 -
be/src/pipeline/task_scheduler.cpp | 45 +++++++++++++-------------------------
be/src/pipeline/task_scheduler.h | 6 ++---
10 files changed, 47 insertions(+), 68 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 835a371c844..c50412f31d6 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -560,10 +560,7 @@ inline std::string Status::to_string() const {
}
inline std::string Status::to_string_no_stack() const {
- std::stringstream ss;
- ss << '[' << code_as_string() << ']';
- ss << msg();
- return ss.str();
+ return fmt::format("[{}] {}", code_as_string(), msg());
}
// some generally useful macros
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index fadbf8c4677..45bad17ed01 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -418,7 +418,8 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg,
FakeSharedState>;
if constexpr (!is_fake_shared) {
if constexpr (std::is_same_v<LocalExchangeSharedState,
SharedStateArg>) {
- _shared_state =
info.le_state_map[_parent->operator_id()].first.get();
+ DCHECK(info.le_state_map.find(_parent->operator_id()) !=
info.le_state_map.end());
+ _shared_state =
info.le_state_map.at(_parent->operator_id()).first.get();
_dependency = _shared_state->get_dep_by_channel_id(info.task_idx);
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
@@ -500,7 +501,8 @@ Status
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
constexpr auto is_fake_shared = std::is_same_v<SharedState,
FakeSharedState>;
if constexpr (!is_fake_shared) {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
- _dependency =
info.le_state_map[_parent->dests_id().front()].second.get();
+ DCHECK(info.le_state_map.find(_parent->dests_id().front()) !=
info.le_state_map.end());
+ _dependency =
info.le_state_map.at(_parent->dests_id().front()).second.get();
_shared_state = (SharedState*)_dependency->shared_state();
} else {
_shared_state = info.shared_state->template cast<SharedState>();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e4560c571c7..1b99b849d26 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -65,10 +65,10 @@ using DataSinkOperatorXPtr =
std::shared_ptr<DataSinkOperatorXBase>;
// This struct is used only for initializing local state.
struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
- const std::vector<TScanRangeParams> scan_ranges;
+ const std::vector<TScanRangeParams>& scan_ranges;
BasicSharedState* shared_state;
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
+ const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
+ std::shared_ptr<Dependency>>>& le_state_map;
const int task_idx;
};
@@ -78,8 +78,8 @@ struct LocalSinkStateInfo {
RuntimeProfile* parent_profile = nullptr;
const int sender_id;
BasicSharedState* shared_state;
- std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
- le_state_map;
+ const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
+ std::shared_ptr<Dependency>>>& le_state_map;
const TDataSink& tsink;
};
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 2c211835c55..b8aa61b74c4 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -96,16 +96,15 @@ Status PipelineTask::prepare(const TPipelineInstanceParams&
local_params, const
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
}
- std::vector<TScanRangeParams> no_scan_ranges;
- auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
- _operators.front()->node_id(),
no_scan_ranges);
+ _scan_ranges = find_with_default(local_params.per_node_scan_ranges,
+ _operators.front()->node_id(),
_scan_ranges);
auto* parent_profile = _state->get_sink_local_state()->profile();
query_ctx->register_query_statistics(
_state->get_sink_local_state()->get_query_statistics_ptr());
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
- LocalStateInfo info {parent_profile, scan_ranges,
get_op_shared_state(op->operator_id()),
+ LocalStateInfo info {parent_profile, _scan_ranges,
get_op_shared_state(op->operator_id()),
_le_state_map, _task_idx};
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
@@ -124,7 +123,7 @@ Status PipelineTask::prepare(const TPipelineInstanceParams&
local_params, const
}
Status PipelineTask::_extract_dependencies() {
- for (auto op : _operators) {
+ for (auto& op : _operators) {
auto result = _state->get_local_state_result(op->operator_id());
if (!result) {
return result.error();
@@ -152,12 +151,9 @@ Status PipelineTask::_extract_dependencies() {
}
void PipelineTask::_init_profile() {
- std::stringstream ss;
- ss << "PipelineTask"
- << " (index=" << _index << ")";
- auto* task_profile = new RuntimeProfile(ss.str());
- _parent_profile->add_child(task_profile, true, nullptr);
- _task_profile.reset(task_profile);
+ _task_profile =
+ std::make_unique<RuntimeProfile>(fmt::format("PipelineTask
(index={})", _index));
+ _parent_profile->add_child(_task_profile.get(), true, nullptr);
_task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");
static const char* exec_time = "ExecuteTime";
@@ -344,6 +340,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState*
state, int64_t revocable_m
return false;
}
}
+
void PipelineTask::finalize() {
std::unique_lock<std::mutex> lc(_release_lock);
_finished = true;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 06c2cea3e2c..7be9593eeb2 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -138,13 +138,12 @@ public:
}
void set_previous_core_id(int id) {
- if (id == _previous_schedule_id) {
- return;
- }
- if (_previous_schedule_id != -1) {
- COUNTER_UPDATE(_core_change_times, 1);
+ if (id != _previous_schedule_id) {
+ if (_previous_schedule_id != -1) {
+ COUNTER_UPDATE(_core_change_times, 1);
+ }
+ _previous_schedule_id = id;
}
- _previous_schedule_id = id;
}
void finalize();
@@ -381,6 +380,7 @@ private:
// All shared states of this pipeline task.
std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
std::shared_ptr<BasicSharedState> _sink_shared_state;
+ std::vector<TScanRangeParams> _scan_ranges;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
_le_state_map;
int _task_idx;
diff --git a/be/src/pipeline/pipeline_tracing.h
b/be/src/pipeline/pipeline_tracing.h
index 27dda6b85f1..aad0a7f9ee8 100644
--- a/be/src/pipeline/pipeline_tracing.h
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -39,7 +39,7 @@ struct ScheduleRecord {
uint64_t thread_id;
uint64_t start_time;
uint64_t end_time;
- std::string state_name;
+ std::string_view state_name;
bool operator<(const ScheduleRecord& rhs) const { return start_time <
rhs.start_time; }
std::string to_string(uint64_t append_value) const {
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index c54df42e1b1..24d71144240 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -19,14 +19,14 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
+#include <memory>
#include <string>
#include "common/logging.h"
#include "pipeline/pipeline_task.h"
#include "runtime/workload_group/workload_group.h"
-namespace doris {
-namespace pipeline {
+namespace doris::pipeline {
TaskQueue::~TaskQueue() = default;
@@ -139,7 +139,7 @@ int PriorityTaskQueue::task_size() {
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) :
TaskQueue(core_size), _closed(false) {
- _prio_task_queue_list.reset(new PriorityTaskQueue[core_size]);
+ _prio_task_queue_list = std::make_unique<PriorityTaskQueue[]>(core_size);
}
void MultiCoreTaskQueue::close() {
@@ -211,5 +211,4 @@ void MultiCoreTaskQueue::update_statistics(PipelineTask*
task, int64_t time_spen
time_spent);
}
-} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 481427eaf4b..74ed9187567 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -135,7 +135,6 @@ public:
void close() override;
// Get the task by core id.
- // TODO: To think the logic is useful?
PipelineTask* take(size_t core_id) override;
// TODO combine these methods to `push_back(task, core_id = -1)`
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index ed88f19bae4..fbb67afdf46 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -37,8 +37,6 @@
#include "pipeline_fragment_context.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
-#include "util/debug_util.h"
-#include "util/sse_util.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/time.h"
@@ -61,9 +59,8 @@ Status TaskScheduler::start() {
.set_max_queue_size(0)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_fix_thread_pool));
- _markers.reserve(cores);
+ _markers.resize(cores, true);
for (size_t i = 0; i < cores; ++i) {
- _markers.push_back(std::make_unique<std::atomic<bool>>(true));
RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i);
}));
}
return Status::OK();
@@ -101,8 +98,7 @@ void _close_task(PipelineTask* task, PipelineTaskState
state, Status exec_status
}
void TaskScheduler::_do_work(size_t index) {
- const auto& marker = _markers[index];
- while (*marker) {
+ while (_markers[index]) {
auto* task = _task_queue->take(index);
if (!task) {
continue;
@@ -120,27 +116,20 @@ void TaskScheduler::_do_work(size_t index) {
auto state = task->get_state();
// If the state is PENDING_FINISH, then the task is come from blocked
queue, its is_pending_finish
// has to return false. The task is finished and need to close now.
- if (state == PipelineTaskState::PENDING_FINISH) {
- Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
- _close_task(task, canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
- exec_status);
- continue;
- }
-
- DCHECK(state != PipelineTaskState::FINISHED && state !=
PipelineTaskState::CANCELED)
- << "task already finish: " << task->debug_string();
-
- if (canceled) {
+ if (state == PipelineTaskState::PENDING_FINISH || canceled) {
// may change from pending FINISH,should called cancel
// also may change form BLOCK, other task called cancel
// If pipeline is canceled, it will report after pipeline closed,
and will propagate
// errors to downstream through exchange. So, here we needn't
send_report.
// fragment_ctx->send_report(true);
- Status cancel_status =
fragment_ctx->get_query_ctx()->exec_status();
- _close_task(task, PipelineTaskState::CANCELED, cancel_status);
+ Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
+ _close_task(task, canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
+ exec_status);
continue;
}
+ DCHECK(state != PipelineTaskState::FINISHED && state !=
PipelineTaskState::CANCELED)
+ << "task already finish: " << task->debug_string();
task->set_state(PipelineTaskState::RUNNABLE);
@@ -150,7 +139,7 @@ void TaskScheduler::_do_work(size_t index) {
try {
// This will enable exception handling logic in allocator.h when
memory allocate
- // failed or sysem memory is not sufficient.
+ // failed or system memory is not sufficient.
doris::enable_thread_catch_bad_alloc++;
Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};
//TODO: use a better enclose to abstracting these
@@ -169,8 +158,7 @@ void TaskScheduler::_do_work(size_t index) {
status = task->execute(&eos);
uint64_t end_time = MonotonicMicros();
- auto state = task->get_state();
- std::string state_name = get_state_name(state);
+ std::string_view state_name =
get_state_name(task->get_state());
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time,
end_time,
state_name});
@@ -216,11 +204,8 @@ void TaskScheduler::_do_work(size_t index) {
task->set_state(PipelineTaskState::PENDING_FINISH);
task->set_running(false);
} else {
- // Close the task directly?
Status exec_status =
fragment_ctx->get_query_ctx()->exec_status();
- _close_task(task,
- canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
- exec_status);
+ _close_task(task, PipelineTaskState::FINISHED, exec_status);
}
continue;
}
@@ -243,13 +228,13 @@ void TaskScheduler::_do_work(size_t index) {
}
void TaskScheduler::stop() {
- if (!this->_shutdown.load()) {
+ if (!_shutdown) {
if (_task_queue) {
_task_queue->close();
}
if (_fix_thread_pool) {
- for (const auto& marker : _markers) {
- marker->store(false);
+ for (size_t i = 0; i < _markers.size(); ++i) {
+ _markers[i] = false;
}
_fix_thread_pool->shutdown();
_fix_thread_pool->wait();
@@ -258,7 +243,7 @@ void TaskScheduler::stop() {
// pool is stopped. For example, if there are 2 threads call stop
// then if one thread set shutdown = false, then another thread will
// not check it and will free task scheduler.
- this->_shutdown.store(true);
+ _shutdown = true;
}
}
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 8e513748203..1a9b9ad17db 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -50,7 +50,7 @@ public:
CgroupCpuCtl* cgroup_cpu_ctl)
: _task_queue(std::move(task_queue)),
_shutdown(false),
- _name(name),
+ _name(std::move(name)),
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
~TaskScheduler();
@@ -66,8 +66,8 @@ public:
private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
std::shared_ptr<TaskQueue> _task_queue;
- std::vector<std::unique_ptr<std::atomic<bool>>> _markers;
- std::atomic<bool> _shutdown;
+ std::vector<bool> _markers;
+ bool _shutdown;
std::string _name;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]