This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a05003fbe1 [fix](pipeline) fix remove pipeline_x_context from fragment 
manager (#24062)
a05003fbe1 is described below

commit a05003fbe1c8b3443fe4b52389cdc93f997e300e
Author: Lijia Liu <[email protected]>
AuthorDate: Sun Sep 10 20:53:26 2023 +0800

    [fix](pipeline) fix remove pipeline_x_context from fragment manager (#24062)
---
 be/src/pipeline/pipeline_fragment_context.h              |  4 ++++
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h |  2 +-
 be/src/runtime/fragment_mgr.cpp                          | 14 +-------------
 be/src/runtime/fragment_mgr.h                            |  3 ---
 be/src/runtime/query_context.h                           |  4 +---
 5 files changed, 7 insertions(+), 20 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 47dad12d7c..415015a807 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -135,6 +135,10 @@ public:
     }
 
     bool is_group_commit() { return _group_commit; }
+    virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
+        ins_ids.resize(1);
+        ins_ids[0] = _fragment_instance_id;
+    }
 
 protected:
     Status _create_sink(int sender_id, const TDataSink& t_data_sink, 
RuntimeState* state);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index e21a004c7a..a0970b166e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -69,7 +69,7 @@ public:
 
     ~PipelineXFragmentContext() override;
 
-    void instance_ids(std::vector<TUniqueId>& ins_ids) const {
+    void instance_ids(std::vector<TUniqueId>& ins_ids) const override {
         ins_ids.resize(_runtime_states.size());
         for (size_t i = 0; i < _runtime_states.size(); i++) {
             ins_ids[i] = _runtime_states[i]->fragment_instance_id();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fdb807c43d..703019a39b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -344,7 +344,7 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
     bool all_done = false;
     if (query_ctx != nullptr) {
         // decrease the number of unfinished fragments
-        all_done = query_ctx->countdown();
+        all_done = query_ctx->countdown(1);
     }
 
     // remove exec state after this fragment finished
@@ -455,18 +455,6 @@ void FragmentMgr::remove_pipeline_context(
     std::lock_guard<std::mutex> lock(_lock);
     auto query_id = f_context->get_query_id();
     auto* q_context = f_context->get_query_context();
-    bool all_done = q_context->countdown();
-    _pipeline_map.erase(f_context->get_fragment_instance_id());
-    if (all_done) {
-        _query_ctx_map.erase(query_id);
-    }
-}
-
-void FragmentMgr::remove_pipeline_context(
-        std::shared_ptr<pipeline::PipelineXFragmentContext> f_context) {
-    std::lock_guard<std::mutex> lock(_lock);
-    auto query_id = f_context->get_query_id();
-    auto* q_context = f_context->get_query_context();
     std::vector<TUniqueId> ins_ids;
     f_context->instance_ids(ins_ids);
     bool all_done = q_context->countdown(ins_ids.size());
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 367661dda4..0cf5cf2d58 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -83,9 +83,6 @@ public:
     void remove_pipeline_context(
             std::shared_ptr<pipeline::PipelineFragmentContext> 
pipeline_context);
 
-    void remove_pipeline_context(
-            std::shared_ptr<pipeline::PipelineXFragmentContext> 
pipeline_context);
-
     // TODO(zc): report this is over
     Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
FinishCallback& cb);
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 7882b21c8d..c27c517ac1 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -95,9 +95,7 @@ public:
 
     // Notice. For load fragments, the fragment_num sent by FE has a small 
probability of 0.
     // this may be a bug, bug <= 1 in theory it shouldn't cause any problems 
at this stage.
-    bool countdown() { return countdown(1); }
-
-    bool countdown(int delta) { return fragment_num.fetch_sub(delta) <= 1; }
+    bool countdown(int instance_num) { return 
fragment_num.fetch_sub(instance_num) <= 1; }
 
     ExecEnv* exec_env() { return _exec_env; }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to