This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7607bfc78df33cd0ff8d02a963b7bedf1c9da53e
Author: yiguolei <[email protected]>
AuthorDate: Mon Feb 19 17:47:35 2024 +0800

    [bugfix](performance) fix performance problem (#31093)
---
 be/src/pipeline/pipeline_fragment_context.h | 9 +++++++++
 be/src/runtime/fragment_mgr.cpp             | 4 +++-
 be/src/runtime/plan_fragment_executor.h     | 8 ++++++++
 3 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 34ca84b1481..9ffcb40038c 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -112,6 +112,11 @@ public:
 
     void close_a_pipeline();
 
+    void set_merge_controller_handler(
+            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
+        _merge_controller_handler = handler;
+    }
+
     virtual void add_merge_controller_handler(
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
 
@@ -188,6 +193,10 @@ protected:
 
     std::shared_ptr<QueryContext> _query_ctx;
 
+    // This shared ptr is never used. It is just a reference to hold the 
object.
+    // There is a weak ptr in runtime filter manager to reference this object.
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
+
     MonotonicStopWatch _fragment_watcher;
     RuntimeProfile::Counter* _start_timer = nullptr;
     RuntimeProfile::Counter* _prepare_timer = nullptr;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1d4802af2f6..a8d29b9f9de 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -719,6 +719,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     static_cast<void>(_runtimefilter_controller.add_entity(
             params.params, params.params.query_id, params.query_options, 
&handler,
             
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
+    fragment_executor->set_merge_controller_handler(handler);
     {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_instance_map.insert(
@@ -806,6 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             static_cast<void>(_runtimefilter_controller.add_entity(
                     params.local_params[i], params.query_id, 
params.query_options, &handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state())));
+            context->set_merge_controller_handler(handler);
             const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
             {
                 std::lock_guard<std::mutex> lock(_lock);
@@ -885,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             static_cast<void>(_runtimefilter_controller.add_entity(
                     local_params, params.query_id, params.query_options, 
&handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state())));
-
+            context->set_merge_controller_handler(handler);
             {
                 std::lock_guard<std::mutex> lock(_lock);
                 _pipeline_map.insert(std::make_pair(fragment_instance_id, 
context));
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 5529d1ba3b5..41fa6c2f819 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -134,6 +134,11 @@ public:
 
     void set_need_wait_execution_trigger() { _need_wait_execution_trigger = 
true; }
 
+    void set_merge_controller_handler(
+            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
+        _merge_controller_handler = handler;
+    }
+
     std::shared_ptr<QueryContext> get_query_ctx() { return _query_ctx; }
 
     TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
@@ -214,6 +219,9 @@ private:
     RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
 
     RuntimeProfile::Counter* _fragment_cpu_timer = nullptr;
+    // This shared ptr is never used. It is just a reference to hold the 
object.
+    // There is a weak ptr in runtime filter manager to reference this object.
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
 
     // If set the true, this plan fragment will be executed only after FE send 
execution start rpc.
     bool _need_wait_execution_trigger = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to