This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e167394dc1 [Fix](pipeline) close sink when fragment context destructs
(#21668)
e167394dc1 is described below
commit e167394dc16e52e4044a32d52226559ff923464c
Author: airborne12 <[email protected]>
AuthorDate: Thu Jul 13 11:52:24 2023 +0800
[Fix](pipeline) close sink when fragment context destructs (#21668)
Co-authored-by: airborne12 <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++++++++
be/src/pipeline/pipeline_fragment_context.h | 1 +
be/src/runtime/fragment_mgr.cpp | 4 ++++
3 files changed, 15 insertions(+)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8eef37d931..7e85b19205 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -705,6 +705,16 @@ Status PipelineFragmentContext::submit() {
}
}
+void PipelineFragmentContext::close_sink() {
+ if (_sink) {
+ if (_prepared) {
+ _sink->close(_runtime_state.get(), Status::RuntimeError("prepare
failed"));
+ } else {
+ _sink->close(_runtime_state.get(), Status::OK());
+ }
+ }
+}
+
void PipelineFragmentContext::close_if_prepare_failed() {
if (_tasks.empty()) {
if (_root_plan) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 262794154b..cda6206d9b 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -83,6 +83,7 @@ public:
Status submit();
void close_if_prepare_failed();
+ void close_sink();
void set_is_report_success(bool is_report_success) { _is_report_success =
is_report_success; }
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6d0cf288ae..3f19d489d9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -348,6 +348,10 @@ FragmentMgr::~FragmentMgr() {
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.clear();
_query_ctx_map.clear();
+ for (auto& pipeline : _pipeline_map) {
+ pipeline.second->close_sink();
+ }
+ _pipeline_map.clear();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]