This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 004acb7352d51e742a325c1cdf57b6d993d81e50
Author: airborne12 <[email protected]>
AuthorDate: Thu Jul 13 11:52:24 2023 +0800

    [Fix](pipeline) close sink when fragment context destructs (#21668)
    
    Co-authored-by: airborne12 <[email protected]>
---
 be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++++++++
 be/src/pipeline/pipeline_fragment_context.h   |  1 +
 be/src/runtime/fragment_mgr.cpp               |  4 ++++
 3 files changed, 15 insertions(+)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 9969770ecd..9fc8c4ce24 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -702,6 +702,16 @@ Status PipelineFragmentContext::submit() {
     }
 }
 
+void PipelineFragmentContext::close_sink() {
+    if (_sink) {
+        if (_prepared) {
+            _sink->close(_runtime_state.get(), Status::RuntimeError("prepare 
failed"));
+        } else {
+            _sink->close(_runtime_state.get(), Status::OK());
+        }
+    }
+}
+
 void PipelineFragmentContext::close_if_prepare_failed() {
     if (_tasks.empty()) {
         if (_root_plan) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 25c8b8ac51..287fac9e40 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -87,6 +87,7 @@ public:
     Status submit();
 
     void close_if_prepare_failed();
+    void close_sink();
 
     void set_is_report_success(bool is_report_success) { _is_report_success = 
is_report_success; }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 2e1f9bb607..b5fcb933cb 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -348,6 +348,10 @@ FragmentMgr::~FragmentMgr() {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_map.clear();
         _query_ctx_map.clear();
+        for (auto& pipeline : _pipeline_map) {
+            pipeline.second->close_sink();
+        }
+        _pipeline_map.clear();
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to