This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 80e863cbcf6 [refactor](pipeline) Delete unnecessary code (#35415)
80e863cbcf6 is described below
commit 80e863cbcf6ed898157bb186c442c5a1e22498ac
Author: Gabriel <[email protected]>
AuthorDate: Mon May 27 15:03:48 2024 +0800
[refactor](pipeline) Delete unnecessary code (#35415)
---
be/src/pipeline/pipeline_fragment_context.cpp | 22 ----------------------
be/src/pipeline/pipeline_fragment_context.h | 10 ----------
be/src/pipeline/pipeline_task.cpp | 4 ----
be/src/pipeline/pipeline_task.h | 2 --
be/src/runtime/fragment_mgr.cpp | 5 +----
5 files changed, 1 insertion(+), 42 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index fd4c903d5aa..89588e39471 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1500,28 +1500,6 @@ Status PipelineFragmentContext::submit() {
}
}
-void PipelineFragmentContext::close_sink() {
- for (auto& tasks : _tasks) {
- auto& root_task = *tasks.begin();
- auto st = root_task->close_sink(_prepared ?
Status::RuntimeError("prepare failed")
- : Status::OK());
- if (!st.ok()) {
- LOG_WARNING("PipelineFragmentContext::close_sink()
error").tag("msg", st.msg());
- }
- }
-}
-
-void PipelineFragmentContext::close_if_prepare_failed(Status st) {
- for (auto& task : _tasks) {
- for (auto& t : task) {
- DCHECK(!t->is_pending_finish());
- WARN_IF_ERROR(t->close(st), "close_if_prepare_failed failed: ");
- close_a_pipeline();
- }
- }
- _query_ctx->cancel(st, _fragment_id);
-}
-
// If all pipeline tasks binded to the fragment instance are finished, then we
could
// close the fragment instance.
void PipelineFragmentContext::_close_fragment_instance() {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 03f405e0401..8bc1eb29139 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -92,9 +92,6 @@ public:
Status submit();
- void close_if_prepare_failed(Status st);
- void close_sink();
-
void set_is_report_success(bool is_report_success) { _is_report_success =
is_report_success; }
void cancel(const Status reason);
@@ -120,8 +117,6 @@ public:
[[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
- [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id;
}
-
void instance_ids(std::vector<TUniqueId>& ins_ids) const {
ins_ids.resize(_fragment_instance_ids.size());
for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
@@ -136,11 +131,6 @@ public:
}
}
- void add_merge_controller_handler(
- std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
- _merge_controller_handlers.emplace_back(handler);
- }
-
private:
Status _build_pipelines(ObjectPool* pool, const
doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorXPtr* root,
PipelinePtr cur_pipe);
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 781566157f0..867dc49dc33 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -449,10 +449,6 @@ Status PipelineTask::close(Status exec_status) {
return s;
}
-Status PipelineTask::close_sink(Status exec_status) {
- return _sink->close(_state, exec_status);
-}
-
std::string PipelineTask::debug_string() {
std::unique_lock<std::mutex> lc(_release_lock);
fmt::memory_buffer debug_string_buffer;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 4bf58a708a5..0965ec1c18f 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -64,8 +64,6 @@ public:
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status);
- Status close_sink(Status exec_status);
-
PipelineFragmentContext* fragment_context() { return _fragment_context; }
QueryContext* query_context();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8534638f681..888d4069731 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -179,9 +179,6 @@ void FragmentMgr::stop() {
std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.clear();
_query_ctx_map.clear();
- for (auto& pipeline : _pipeline_map) {
- pipeline.second->close_sink();
- }
_pipeline_map.clear();
}
_async_report_thread_pool->shutdown();
@@ -860,7 +857,7 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
if (!prepare_st.ok()) {
- context->close_if_prepare_failed(prepare_st);
+ query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
return prepare_st;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]