This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0349841069d [refactor](scheduler) Simplify TaskScheduler (#48118)
0349841069d is described below
commit 0349841069dae08cdd2d9bc6c109f49dcaabe8b7
Author: Gabriel <[email protected]>
AuthorDate: Thu Feb 20 19:32:29 2025 +0800
[refactor](scheduler) Simplify TaskScheduler (#48118)
---
be/src/pipeline/pipeline_fragment_context.cpp | 4 +-
be/src/pipeline/pipeline_fragment_context.h | 2 +-
be/src/pipeline/task_scheduler.cpp | 75 ++++++++++-----------------
3 files changed, 32 insertions(+), 49 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8797ecd49b2..00af07f7bdd 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1753,7 +1753,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}
-void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
+bool PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
// If all tasks of this pipeline has been closed, upstream tasks is never
needed, and we just make those runnable here
DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
@@ -1767,7 +1767,9 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId
pipeline_id) {
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
_close_fragment_instance();
+ return true;
}
+ return false;
}
Status PipelineFragmentContext::send_report(bool done) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index bd3a350d0a2..6fa4925e302 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -100,7 +100,7 @@ public:
[[nodiscard]] int get_fragment_id() const { return _fragment_id; }
- void close_a_pipeline(PipelineId pipeline_id);
+ bool decrement_running_task(PipelineId pipeline_id);
Status send_report(bool);
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 60d9efa66ad..7948a853799 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -69,31 +69,34 @@ Status TaskScheduler::schedule_task(PipelineTask* task) {
return _task_queue.push_back(task);
}
-// after _close_task, task maybe destructed.
-void _close_task(PipelineTask* task, Status exec_status) {
+// after close_task, task maybe destructed.
+bool close_task(PipelineTask* task, Status exec_status) {
+ if (exec_status.ok() && task->is_pending_finish()) {
+ // Close phase is blocked by dependency.
+ return false;
+ }
// Has to attach memory tracker here, because the close task will also
release some memory.
// Should count the memory to the query or the query's memory will not
decrease when part of
// task finished.
SCOPED_ATTACH_TASK(task->runtime_state());
if (task->is_finalized()) {
- task->set_running(false);
- return;
+ return false;
+ }
+ if (!exec_status.ok()) {
+ task->fragment_context()->cancel(exec_status);
+ LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {}
reason: {}",
+
print_id(task->query_context()->query_id()),
+ exec_status.to_string());
}
- // close_a_pipeline may delete fragment context and will core in some defer
- // code, because the defer code will access fragment context it self.
+ // decrement_running_task may delete fragment context and will core in
some defer
+ // code, because the defer code will access fragment context itself.
auto lock_for_context = task->fragment_context()->shared_from_this();
- // is_pending_finish does not check status, so has to check status in
close API.
- // For example, in async writer, the writer may failed during dealing with
eos_block
- // but it does not return error status. Has to check the error status in
close API.
- // We have already refactor all source and sink api, the close API does
not need waiting
- // for pending finish now. So that could call close directly.
Status status = task->close(exec_status);
if (!status.ok()) {
task->fragment_context()->cancel(status);
}
task->finalize();
- task->set_running(false);
- task->fragment_context()->close_a_pipeline(task->pipeline_id());
+ return
task->fragment_context()->decrement_running_task(task->pipeline_id());
}
void TaskScheduler::_do_work(int index) {
@@ -111,28 +114,28 @@ void TaskScheduler::_do_work(int index) {
}
task->log_detail_if_need();
task->set_running(true);
+ bool fragment_is_finished = false;
+ Defer task_running_defer {[&]() {
+ // If fragment is finished, fragment context will be
de-constructed with all tasks in it.
+ if (!fragment_is_finished) {
+ task->set_running(false);
+ }
+ }};
task->set_task_queue(&_task_queue);
auto* fragment_ctx = task->fragment_context();
bool canceled = fragment_ctx->is_canceled();
- // If the state is PENDING_FINISH, then the task is come from blocked
queue, its is_pending_finish
- // has to return false. The task is finished and need to close now.
+ // Close task if canceled
if (canceled) {
- // may change from pending FINISH,should called cancel
- // also may change form BLOCK, other task called cancel
-
- // If pipeline is canceled, it will report after pipeline closed,
and will propagate
- // errors to downstream through exchange. So, here we needn't
send_report.
- // fragment_ctx->send_report(true);
- _close_task(task, fragment_ctx->get_query_ctx()->exec_status());
+ fragment_is_finished = close_task(task,
fragment_ctx->get_query_ctx()->exec_status());
continue;
}
- // task exec
bool eos = false;
auto status = Status::OK();
task->set_core_id(index);
+ // Main logics of execution
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
@@ -151,33 +154,11 @@ void TaskScheduler::_do_work(int index) {
start_time, end_time});
} else { status = task->execute(&eos); },
status);
-
- if (!status.ok()) {
- // Print detail informations below when you debugging here.
- //
- // LOG(WARNING)<< "task:\n"<<task->debug_string();
-
- // exec failed,cancel all fragment instance
- fragment_ctx->cancel(status);
- LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {}
reason: {}",
-
print_id(task->query_context()->query_id()),
- status.to_string());
- _close_task(task, status);
- continue;
- }
fragment_ctx->trigger_report_if_necessary();
- if (eos) {
- // is pending finish will add the task to dependency's blocking
queue, and then the task will be
- // added to running queue when dependency is ready.
- if (!task->is_pending_finish()) {
- Status exec_status =
fragment_ctx->get_query_ctx()->exec_status();
- _close_task(task, exec_status);
- continue;
- }
+ if (eos || !status.ok()) {
+ fragment_is_finished = close_task(task, status);
}
-
- task->set_running(false);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]