This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 004acb7352d51e742a325c1cdf57b6d993d81e50 Author: airborne12 <[email protected]> AuthorDate: Thu Jul 13 11:52:24 2023 +0800 [Fix](pipeline) close sink when fragment context destructs (#21668) Co-authored-by: airborne12 <[email protected]> --- be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++++++++ be/src/pipeline/pipeline_fragment_context.h | 1 + be/src/runtime/fragment_mgr.cpp | 4 ++++ 3 files changed, 15 insertions(+) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 9969770ecd..9fc8c4ce24 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -702,6 +702,16 @@ Status PipelineFragmentContext::submit() { } } +void PipelineFragmentContext::close_sink() { + if (_sink) { + if (_prepared) { + _sink->close(_runtime_state.get(), Status::RuntimeError("prepare failed")); + } else { + _sink->close(_runtime_state.get(), Status::OK()); + } + } +} + void PipelineFragmentContext::close_if_prepare_failed() { if (_tasks.empty()) { if (_root_plan) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 25c8b8ac51..287fac9e40 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -87,6 +87,7 @@ public: Status submit(); void close_if_prepare_failed(); + void close_sink(); void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 2e1f9bb607..b5fcb933cb 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -348,6 +348,10 @@ FragmentMgr::~FragmentMgr() { std::lock_guard<std::mutex> lock(_lock); _fragment_map.clear(); _query_ctx_map.clear(); + for (auto& pipeline : _pipeline_map) { + pipeline.second->close_sink(); + } + _pipeline_map.clear(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
