This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fe5c4e3b465 [pipelineX](api) Fix core dump for pipelineX API (#27437)
fe5c4e3b465 is described below
commit fe5c4e3b465ff4055d979ba2624c86526428fa45
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 23 00:24:32 2023 +0800
[pipelineX](api) Fix core dump for pipelineX API (#27437)
---
be/src/pipeline/exec/operator.h | 10 --------
.../pipeline/exec/partition_sort_sink_operator.cpp | 2 --
.../pipeline/exec/partition_sort_sink_operator.h | 2 --
.../exec/partition_sort_source_operator.cpp | 2 --
.../pipeline/exec/partition_sort_source_operator.h | 4 +---
be/src/pipeline/pipeline_task.cpp | 13 ----------
be/src/pipeline/pipeline_task.h | 5 +---
be/src/pipeline/pipeline_x/operator.h | 4 ----
.../pipeline_x/pipeline_x_fragment_context.cpp | 2 +-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 25 +++++++++----------
be/src/pipeline/pipeline_x/pipeline_x_task.h | 13 ++++------
be/src/pipeline/task_scheduler.cpp | 28 ++++++++--------------
12 files changed, 31 insertions(+), 79 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 5362be88e89..d7330648757 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -241,12 +241,6 @@ public:
virtual Status sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) = 0;
- virtual Status finalize(RuntimeState* state) {
- std::stringstream error_msg;
- error_msg << " not a sink, can not finalize";
- return Status::NotSupported(error_msg.str());
- }
-
/**
* pending_finish means we have called `close` and there are still some
work to do before finishing.
* Now it is a pull-based pipeline and operators pull data from its child
by this method.
@@ -323,8 +317,6 @@ public:
return Status::OK();
}
- Status finalize(RuntimeState* state) override { return Status::OK(); }
-
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
return _sink->profile(); }
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics)
override {
_sink->set_query_statistics(statistics);
@@ -391,8 +383,6 @@ public:
return Status::OK();
}
- Status finalize(RuntimeState* state) override { return Status::OK(); }
-
bool can_read() override { return _node->can_read(); }
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index d7a2bc4c920..327d186d77b 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -42,8 +42,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
_agg_arena_pool = std::make_unique<vectorized::Arena>();
_hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize",
TUnit::UNIT);
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
- _partition_sort_timer = ADD_TIMER(_profile, "PartitionSortTime");
- _get_sorted_timer = ADD_TIMER(_profile, "GetSortedTime");
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
_init_hash_method();
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index dbfb7ad5d0e..62bf7c16b43 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -84,8 +84,6 @@ private:
RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _emplace_key_timer;
- RuntimeProfile::Counter* _partition_sort_timer;
- RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _selector_block_timer;
RuntimeProfile::Counter* _hash_table_size_counter;
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 0a2f28b19f3..56db3852bc2 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -33,7 +33,6 @@ Status PartitionSortSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortSourceDependency>::init(state,
info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- _get_next_timer = ADD_TIMER(profile(), "GetResultTime");
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->previous_row =
std::make_unique<vectorized::SortCursorCmp>();
return Status::OK();
@@ -44,7 +43,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- SCOPED_TIMER(local_state._get_next_timer);
output_block->clear_column_data();
{
std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h
b/be/src/pipeline/exec/partition_sort_source_operator.h
index df22bde6365..719f6c13889 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -83,15 +83,13 @@ public:
using Base = PipelineXLocalState<PartitionSortSourceDependency>;
PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<PartitionSortSourceDependency>(state,
parent),
- _get_sorted_timer(nullptr),
- _get_next_timer(nullptr) {}
+ _get_sorted_timer(nullptr) {}
Status init(RuntimeState* state, LocalStateInfo& info) override;
private:
friend class PartitionSortSourceOperatorX;
RuntimeProfile::Counter* _get_sorted_timer;
- RuntimeProfile::Counter* _get_next_timer;
std::atomic<int> _sort_idx = 0;
};
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 72552b48468..7d9900f637c 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -113,7 +113,6 @@ void PipelineTask::_init_profile() {
_get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime",
exec_time);
_get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter",
TUnit::UNIT);
_sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
- _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime",
exec_time);
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
_wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
@@ -320,18 +319,6 @@ Status PipelineTask::execute(bool* eos) {
return Status::OK();
}
-Status PipelineTask::finalize() {
- SCOPED_TIMER(_task_profile->total_time_counter());
- SCOPED_CPU_TIMER(_task_cpu_timer);
- Defer defer {[&]() {
- if (_task_queue) {
- _task_queue->update_statistics(this, _finalize_timer->value());
- }
- }};
- SCOPED_TIMER(_finalize_timer);
- return _sink->finalize(_state);
-}
-
Status PipelineTask::_collect_query_statistics() {
// The execnode tree of a fragment will be split into multiple pipelines,
we only need to collect the root pipeline.
if (_pipeline->is_root_pipeline()) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 0b26c9d215d..4e9c2f8cc9e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -172,7 +172,7 @@ public:
virtual bool sink_can_write() { return _sink->can_write() ||
_pipeline->_always_can_write; }
- virtual Status finalize();
+ virtual void finalize() {}
PipelineFragmentContext* fragment_context() { return _fragment_context; }
@@ -193,8 +193,6 @@ public:
_previous_schedule_id = id;
}
- virtual void release_dependency() {}
-
bool has_dependency();
OperatorPtr get_root() { return _root; }
@@ -315,7 +313,6 @@ protected:
RuntimeProfile::Counter* _get_block_timer;
RuntimeProfile::Counter* _get_block_counter;
RuntimeProfile::Counter* _sink_timer;
- RuntimeProfile::Counter* _finalize_timer;
RuntimeProfile::Counter* _close_timer;
RuntimeProfile::Counter* _block_counts;
RuntimeProfile::Counter* _block_by_source_counts;
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 943d8c4670b..3f9548099a2 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -182,8 +182,6 @@ public:
Status open(RuntimeState* state) override;
- Status finalize(RuntimeState* state) override { return Status::OK(); }
-
[[nodiscard]] bool can_terminate_early() override { return false; }
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) {
return false; }
@@ -518,8 +516,6 @@ public:
[[nodiscard]] std::string get_name() const override { return _name; }
- Status finalize(RuntimeState* state) override { return Status::OK(); }
-
virtual bool should_dry_run(RuntimeState* state) { return false; }
protected:
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index e47b55e4914..e832124bf88 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1044,7 +1044,7 @@ std::string PipelineXFragmentContext::debug_string() {
for (size_t j = 0; j < _tasks.size(); j++) {
fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
for (size_t i = 0; i < _tasks[j].size(); i++) {
- if (_tasks[j][i]->get_state() == PipelineTaskState::FINISHED) {
+ if (_tasks[j][i]->is_finished()) {
continue;
}
fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
_tasks[j][i]->debug_string());
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 703e4862f7c..110faba5ca1 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -156,7 +156,6 @@ void PipelineXTask::_init_profile() {
_get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime",
exec_time);
_get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter",
TUnit::UNIT);
_sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
- _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime",
exec_time);
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
_wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
@@ -289,16 +288,14 @@ Status PipelineXTask::execute(bool* eos) {
return Status::OK();
}
-Status PipelineXTask::finalize() {
- SCOPED_TIMER(_task_profile->total_time_counter());
- SCOPED_CPU_TIMER(_task_cpu_timer);
- Defer defer {[&]() {
- if (_task_queue) {
- _task_queue->update_statistics(this, _finalize_timer->value());
- }
- }};
- SCOPED_TIMER(_finalize_timer);
- return _sink->finalize(_state);
+void PipelineXTask::finalize() {
+ PipelineTask::finalize();
+ std::unique_lock<std::mutex> lc(_release_lock);
+ _finished = true;
+ std::vector<DependencySPtr> {}.swap(_downstream_dependency);
+ DependencyMap {}.swap(_upstream_dependency);
+
+ _local_exchange_state = nullptr;
}
Status PipelineXTask::try_close(Status exec_status) {
@@ -338,6 +335,10 @@ Status PipelineXTask::close(Status exec_status) {
}
std::string PipelineXTask::debug_string() {
+ std::unique_lock<std::mutex> lc(_release_lock);
+ if (_finished) {
+ return "ALREADY FINISHED";
+ }
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "QueryId: {}\n",
print_id(query_context()->query_id()));
@@ -374,7 +375,7 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
- _finish_dependencies[i]->debug_string(j + 1));
+ _finish_dependencies[j]->debug_string(j + 1));
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 46d006d0dac..621773b2ac1 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -84,7 +84,9 @@ public:
bool sink_can_write() override { return _write_blocked_dependency() ==
nullptr; }
- Status finalize() override;
+ void finalize() override;
+
+ bool is_finished() const { return _finished.load(); }
std::string debug_string() override;
@@ -103,13 +105,6 @@ public:
}
}
- void release_dependency() override {
- std::vector<DependencySPtr> {}.swap(_downstream_dependency);
- DependencyMap {}.swap(_upstream_dependency);
-
- _local_exchange_state = nullptr;
- }
-
std::vector<DependencySPtr>& get_upstream_dependency(int id) {
if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
_upstream_dependency.insert({id, {DependencySPtr {}}});
@@ -212,6 +207,8 @@ private:
Dependency* _blocked_dep {nullptr};
std::atomic<bool> _use_blocking_queue {true};
+ std::atomic<bool> _finished {false};
+ std::mutex _release_lock;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 5470ba22591..cc6e502fb5e 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -313,23 +313,15 @@ void TaskScheduler::_do_work(size_t index) {
task->set_eos_time();
// TODO: pipeline parallel need to wait the last task finish to
call finalize
// and find_p_dependency
- status = task->finalize();
- if (!status.ok()) {
- // execute failed,cancel all fragment
- fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
- "finalize fail:" + status.msg());
- } else {
- VLOG_DEBUG << fmt::format(
- "Try close task: {}, fragment_ctx->is_canceled(): {}",
- PrintInstanceStandardInfo(
- task->query_context()->query_id(),
-
task->fragment_context()->get_fragment_instance_id()),
- fragment_ctx->is_canceled());
- _try_close_task(task,
- fragment_ctx->is_canceled() ?
PipelineTaskState::CANCELED
- :
PipelineTaskState::FINISHED,
- status);
- }
+ VLOG_DEBUG << fmt::format(
+ "Try close task: {}, fragment_ctx->is_canceled(): {}",
+
PrintInstanceStandardInfo(task->query_context()->query_id(),
+
task->fragment_context()->get_fragment_instance_id()),
+ fragment_ctx->is_canceled());
+ _try_close_task(task,
+ fragment_ctx->is_canceled() ?
PipelineTaskState::CANCELED
+ :
PipelineTaskState::FINISHED,
+ status);
VLOG_DEBUG << fmt::format(
"Task {} is eos, status {}.",
PrintInstanceStandardInfo(task->query_context()->query_id(),
@@ -388,7 +380,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state,
}
task->set_state(state);
task->set_close_pipeline_time();
- task->release_dependency();
+ task->finalize();
task->set_running(false);
// close_a_pipeline may delete fragment context and will core in some defer
// code, because the defer code will access fragment context it self.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]