This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ff4d1e7b24e [chore](profile) Minor refactor on runtime filter producer 
profile (#50055)
ff4d1e7b24e is described below

commit ff4d1e7b24e7f701c7ee6e946978ef9f782350c4
Author: zhiqiang <hezhiqi...@selectdb.com>
AuthorDate: Wed Apr 16 10:14:40 2025 +0800

    [chore](profile) Minor refactor on runtime filter producer profile (#50055)
    
    ### What problem does this PR solve?
    
    Do same thing from https://github.com/apache/doris/pull/49777 to
    RuntimeFilterProducer
    
    In execution profile:
    ```text
    - RuntimeFilterInfo:
         - BuildTime: 392.464us
         - PublishTime: 69.942us
         - RF4 Info: Producer: ([id: 4, state: [READY], type: MINMAX_FILTER, 
column_type: INT], mode: LOCAL, state: PUBLISHED)
         - RF5 Info: Producer: ([id: 5, state: [READY], type: 
IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576, 
build_bf_by_runtime_size: true], mode: LOCAL, state: PUBLISHED)
         - SkipProcess: False
    ```
    In merged profile
    ```
    - RuntimeFilterInfo: sum , avg , max , min
          - BuildTime: avg 0ns, max 0ns, min 0ns
          - PublishTime: avg 103.959us, max 103.959us, min 103.959us
    ```
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  5 ++-
 .../exec/nested_loop_join_build_operator.cpp       |  3 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  7 +++-
 be/src/runtime/runtime_state.cpp                   |  9 ++---
 be/src/runtime/runtime_state.h                     |  6 +--
 be/src/runtime_filter/runtime_filter_mgr.cpp       | 14 +++----
 be/src/runtime_filter/runtime_filter_mgr.h         |  6 +--
 be/src/runtime_filter/runtime_filter_producer.h    | 36 ++++++++++--------
 .../runtime_filter_producer_helper.cpp             | 34 ++++++++++++++---
 .../runtime_filter_producer_helper.h               | 26 +++++++------
 .../runtime_filter_producer_helper_cross.h         |  3 +-
 .../runtime_filter_producer_helper_set.h           |  3 +-
 .../runtime_filter_consumer_helper_test.cpp        |  2 +-
 .../runtime_filter_consumer_test.cpp               |  8 ++--
 .../runtime_filter/runtime_filter_merger_test.cpp  | 14 +++----
 be/test/runtime_filter/runtime_filter_mgr_test.cpp | 43 ++++++++++------------
 .../runtime_filter_producer_helper_cross_test.cpp  |  2 +-
 .../runtime_filter_producer_helper_test.cpp        | 10 ++---
 .../runtime_filter_producer_test.cpp               | 18 ++++-----
 19 files changed, 141 insertions(+), 108 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index b9ca1c6f6bd..bfc283470bf 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -88,7 +88,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     // Hash Table Init
     RETURN_IF_ERROR(_hash_table_init(state));
     _runtime_filter_producer_helper = 
std::make_shared<RuntimeFilterProducerHelper>(
-            profile(), _should_build_hash_table, p._is_broadcast_join);
+            _should_build_hash_table, p._is_broadcast_join);
     RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, 
_build_expr_ctxs,
                                                           
p._runtime_filter_descs));
     return Status::OK();
@@ -250,6 +250,9 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
                 e.to_string(), _terminated, _should_build_hash_table,
                 _finish_dependency->debug_string(), 
blocked_by_shared_hash_table_signal);
     }
+    if (_runtime_filter_producer_helper) {
+        _runtime_filter_producer_helper->collect_realtime_profile(profile());
+    }
     return Base::close(state, exec_status);
 }
 
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 7b8647f2232..fd44242cb68 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -41,7 +41,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
         RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, 
_filter_src_expr_ctxs[i]));
     }
 
-    _runtime_filter_producer_helper = 
std::make_shared<RuntimeFilterProducerHelperCross>(profile());
+    _runtime_filter_producer_helper = 
std::make_shared<RuntimeFilterProducerHelperCross>();
     RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, 
_filter_src_expr_ctxs,
                                                           
p._runtime_filter_descs));
     return Status::OK();
@@ -56,6 +56,7 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* 
state) {
 
 Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
     RETURN_IF_ERROR(_runtime_filter_producer_helper->process(state, 
_shared_state->build_blocks));
+    _runtime_filter_producer_helper->collect_realtime_profile(profile());
     RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status));
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 0d8f4a45eb7..6c5b4483915 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -66,6 +66,11 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState* 
state, Status exec_s
                     e.to_string(), _terminated, 
_finish_dependency->debug_string());
         }
     }
+
+    if (_runtime_filter_producer_helper) {
+        _runtime_filter_producer_helper->collect_realtime_profile(profile());
+    }
+
     return Base::close(state, exec_status);
 }
 
@@ -209,7 +214,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* 
state, LocalSinkState
 
     RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs));
 
-    _runtime_filter_producer_helper = 
std::make_shared<RuntimeFilterProducerHelperSet>(profile());
+    _runtime_filter_producer_helper = 
std::make_shared<RuntimeFilterProducerHelperSet>();
     RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs,
                                                           
parent._runtime_filter_descs));
     return Status::OK();
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index b8dd7f560e9..2389883afb2 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -495,14 +495,13 @@ RuntimeFilterMgr* 
RuntimeState::global_runtime_filter_mgr() {
 }
 
 Status RuntimeState::register_producer_runtime_filter(
-        const TRuntimeFilterDesc& desc, 
std::shared_ptr<RuntimeFilterProducer>* producer_filter,
-        RuntimeProfile* parent_profile) {
+        const TRuntimeFilterDesc& desc, 
std::shared_ptr<RuntimeFilterProducer>* producer_filter) {
     // Producers are created by local runtime filter mgr and shared by global 
runtime filter manager.
     // When RF is published, consumers in both global and local RF mgr will be 
found.
-    RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
-            _query_ctx, desc, producer_filter, parent_profile));
+    
RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(_query_ctx,
 desc,
+                                                                         
producer_filter));
     
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter(
-            _query_ctx, desc, *producer_filter, &_profile));
+            _query_ctx, desc, *producer_filter));
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 4037c22d31a..e4ecf59563c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -561,9 +561,9 @@ public:
         return _task_execution_context;
     }
 
-    Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& 
desc,
-                                            
std::shared_ptr<RuntimeFilterProducer>* producer_filter,
-                                            RuntimeProfile* parent_profile);
+    Status register_producer_runtime_filter(
+            const doris::TRuntimeFilterDesc& desc,
+            std::shared_ptr<RuntimeFilterProducer>* producer_filter);
 
     Status register_consumer_runtime_filter(
             const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int 
node_id,
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index c18a448f1b1..a2072b6771b 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -74,7 +74,7 @@ Status RuntimeFilterMgr::register_consumer_filter(
 
 Status RuntimeFilterMgr::register_local_merger_producer_filter(
         const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
-        std::shared_ptr<RuntimeFilterProducer> producer, RuntimeProfile* 
parent_profile) {
+        std::shared_ptr<RuntimeFilterProducer> producer) {
     if (!_is_global) [[unlikely]] {
         return Status::InternalError(
                 "A local merge filter can not be registered in Local 
RuntimeFilterMgr");
@@ -129,10 +129,9 @@ Status 
RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::register_producer_filter(const QueryContext* 
query_ctx,
-                                                  const TRuntimeFilterDesc& 
desc,
-                                                  
std::shared_ptr<RuntimeFilterProducer>* producer,
-                                                  RuntimeProfile* 
parent_profile) {
+Status RuntimeFilterMgr::register_producer_filter(
+        const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
+        std::shared_ptr<RuntimeFilterProducer>* producer) {
     if (_is_global) [[unlikely]] {
         return Status::InternalError(
                 "A local producer filter should not be registered in Global 
RuntimeFilterMgr");
@@ -144,7 +143,7 @@ Status RuntimeFilterMgr::register_producer_filter(const 
QueryContext* query_ctx,
     if (_producer_id_set.contains(key)) {
         return Status::InvalidArgument("filter {} has been registered", key);
     }
-    RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer, 
parent_profile));
+    RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer));
     _producer_id_set.insert(key);
     return Status::OK();
 }
@@ -312,7 +311,7 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
         }
         std::shared_ptr<RuntimeFilterProducer> tmp_filter;
         RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx.get(), 
&cnt_val.runtime_filter_desc,
-                                                      &tmp_filter, nullptr));
+                                                      &tmp_filter));
 
         RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));
 
@@ -347,6 +346,7 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
         auto ctx = query_ctx->ignore_runtime_filter_error() ? 
std::weak_ptr<QueryContext> {}
                                                             : query_ctx;
         std::vector<TRuntimeFilterTargetParamsV2>& targets = 
cnt_val.targetv2_info;
+
         for (auto& target : targets) {
             auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
                                               
DummyBrpcCallback<PPublishFilterResponse>>::
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 87e471a745b..00a91d61a2b 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -87,15 +87,13 @@ public:
 
     Status register_local_merger_producer_filter(const QueryContext* query_ctx,
                                                  const TRuntimeFilterDesc& 
desc,
-                                                 
std::shared_ptr<RuntimeFilterProducer> producer,
-                                                 RuntimeProfile* 
parent_profile);
+                                                 
std::shared_ptr<RuntimeFilterProducer> producer);
 
     Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** 
local_merge_filters);
 
     // Create local producer. This producer is hold by 
RuntimeFilterProducerHelper.
     Status register_producer_filter(const QueryContext* query_ctx, const 
TRuntimeFilterDesc& desc,
-                                    std::shared_ptr<RuntimeFilterProducer>* 
producer,
-                                    RuntimeProfile* parent_profile);
+                                    std::shared_ptr<RuntimeFilterProducer>* 
producer);
 
     // update filter by remote
     bool set_runtime_filter_params(const TRuntimeFilterParams& 
runtime_filter_params);
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index d688e29fe00..620262f6051 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -22,6 +22,7 @@
 #include "pipeline/dependency.h"
 #include "runtime/query_context.h"
 #include "runtime_filter/runtime_filter.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -46,10 +47,8 @@ public:
     };
 
     static Status create(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc,
-                         std::shared_ptr<RuntimeFilterProducer>* res,
-                         RuntimeProfile* parent_profile) {
-        *res = std::shared_ptr<RuntimeFilterProducer>(
-                new RuntimeFilterProducer(query_ctx, desc, parent_profile));
+                         std::shared_ptr<RuntimeFilterProducer>* res) {
+        *res = std::shared_ptr<RuntimeFilterProducer>(new 
RuntimeFilterProducer(query_ctx, desc));
         RETURN_IF_ERROR((*res)->_init_with_desc(desc, 
&query_ctx->query_options()));
         return Status::OK();
     }
@@ -117,24 +116,33 @@ public:
             return false;
         }
         _rf_state = state;
-        _profile->add_info_string("Info", debug_string());
         return true;
     }
 
     std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; }
     void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper 
= wrapper; }
 
-private:
-    RuntimeFilterProducer(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc,
-                          RuntimeProfile* parent_profile)
-            : RuntimeFilter(desc),
-              _is_broadcast_join(desc->is_broadcast_join),
-              _profile(new RuntimeProfile(fmt::format("RF{}", 
desc->filter_id))) {
-        if (parent_profile) { //tmp filter for mgr has no profile
-            parent_profile->add_child(_profile.get(), true, nullptr);
+    void collect_realtime_profile(RuntimeProfile* parent_operator_profile) {
+        DCHECK(parent_operator_profile != nullptr);
+        if (parent_operator_profile == nullptr) {
+            return;
+        }
+        /*
+        RuntimeFilterInfo:
+            - RF0 Info: xxxx
+        */
+        {
+            std::unique_lock<std::mutex> l(_mtx);
+            parent_operator_profile->add_description(
+                    fmt::format("RF{} Info", _wrapper->filter_id()), 
debug_string(),
+                    "RuntimeFilterInfo");
         }
     }
 
+private:
+    RuntimeFilterProducer(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc)
+            : RuntimeFilter(desc), _is_broadcast_join(desc->is_broadcast_join) 
{}
+
     Status _send_to_remote_targets(RuntimeState* state, RuntimeFilter* 
merger_filter);
     Status _send_to_local_targets(RuntimeState* state, RuntimeFilter* 
merger_filter, bool global);
 
@@ -150,7 +158,6 @@ private:
         RETURN_IF_ERROR(RuntimeFilter::_init_with_desc(desc, options));
         _need_sync_filter_size = _wrapper->build_bf_by_runtime_size() && 
!_is_broadcast_join;
         _rf_state = _need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE : 
State::WAITING_FOR_DATA;
-        _profile->add_info_string("Info", debug_string());
         return Status::OK();
     }
 
@@ -161,7 +168,6 @@ private:
     std::shared_ptr<pipeline::CountedFinishDependency> _dependency;
 
     std::atomic<State> _rf_state;
-    std::unique_ptr<RuntimeProfile> _profile;
 
     // only used to lock set_state() to make _rf_state is protected
     // set_synced_size and RuntimeFilterProducerHelper::terminate may called 
in different threads at the same time
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index 9b3d5b1acdb..435585da3d3 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -17,6 +17,8 @@
 
 #include "runtime_filter/runtime_filter_producer_helper.h"
 
+#include <gen_cpp/Metrics_types.h>
+
 #include "pipeline/pipeline_task.h"
 #include "runtime_filter/runtime_filter_wrapper.h"
 
@@ -36,8 +38,8 @@ Status RuntimeFilterProducerHelper::init(
         const std::vector<TRuntimeFilterDesc>& runtime_filter_descs) {
     _producers.resize(runtime_filter_descs.size());
     for (size_t i = 0; i < runtime_filter_descs.size(); i++) {
-        
RETURN_IF_ERROR(state->register_producer_runtime_filter(runtime_filter_descs[i],
-                                                                
&_producers[i], _profile.get()));
+        RETURN_IF_ERROR(
+                
state->register_producer_runtime_filter(runtime_filter_descs[i], 
&_producers[i]));
     }
     _init_expr(build_expr_ctxs, runtime_filter_descs);
     return Status::OK();
@@ -68,7 +70,7 @@ Status 
RuntimeFilterProducerHelper::_init_filters(RuntimeState* state,
 }
 
 Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, 
size_t start) {
-    SCOPED_TIMER(_runtime_filter_compute_timer);
+    SCOPED_TIMER(_runtime_filter_compute_timer.get());
     for (int i = 0; i < _producers.size(); i++) {
         auto filter = _producers[i];
         int result_column_id = 
_filter_expr_contexts[i]->get_last_result_column_id();
@@ -80,7 +82,7 @@ Status RuntimeFilterProducerHelper::_insert(const 
vectorized::Block* block, size
 }
 
 Status RuntimeFilterProducerHelper::_publish(RuntimeState* state) {
-    SCOPED_TIMER(_publish_runtime_filter_timer);
+    SCOPED_TIMER(_publish_runtime_filter_timer.get());
     for (const auto& filter : _producers) {
         RETURN_IF_ERROR(filter->publish(state, _should_build_hash_table));
     }
@@ -153,8 +155,30 @@ Status 
RuntimeFilterProducerHelper::skip_process(RuntimeState* state) {
 
     RETURN_IF_ERROR(publish(state));
     _skip_runtime_filters_process = true;
-    _profile->add_info_string("SkipProcess", "True");
     return Status::OK();
 }
 
+void RuntimeFilterProducerHelper::collect_realtime_profile(
+        RuntimeProfile* parent_operator_profile) {
+    DCHECK(parent_operator_profile != nullptr);
+    if (parent_operator_profile == nullptr) {
+        return;
+    }
+
+    parent_operator_profile->add_counter_with_level("RuntimeFilterInfo", 
TUnit::NONE, 1);
+    RuntimeProfile::Counter* publish_timer = 
parent_operator_profile->add_counter(
+            "PublishTime", TUnit::TIME_NS, "RuntimeFilterInfo", 1);
+    RuntimeProfile::Counter* build_timer = 
parent_operator_profile->add_counter(
+            "BuildTime", TUnit::TIME_NS, "RuntimeFilterInfo", 1);
+
+    parent_operator_profile->add_description(
+            "SkipProcess", _skip_runtime_filters_process ? "True" : "False", 
"RuntimeFilterInfo");
+    publish_timer->set(_publish_runtime_filter_timer->value());
+    build_timer->set(_runtime_filter_compute_timer->value());
+
+    for (auto& producer : _producers) {
+        producer->collect_realtime_profile(parent_operator_profile);
+    }
+}
+
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h 
b/be/src/runtime_filter/runtime_filter_producer_helper.h
index d3a6f23bd10..a802e9b3ba7 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <gen_cpp/Metrics_types.h>
+
 #include "common/be_mock_util.h"
 #include "common/status.h"
 #include "runtime/runtime_state.h"
@@ -36,15 +38,9 @@ class RuntimeFilterProducerHelper {
 public:
     virtual ~RuntimeFilterProducerHelper() = default;
 
-    RuntimeFilterProducerHelper(RuntimeProfile* profile, bool 
should_build_hash_table,
-                                bool is_broadcast_join)
+    RuntimeFilterProducerHelper(bool should_build_hash_table, bool 
is_broadcast_join)
             : _should_build_hash_table(should_build_hash_table),
-              _profile(new RuntimeProfile("RuntimeFilterProducerHelper")),
-              _is_broadcast_join(is_broadcast_join) {
-        profile->add_child(_profile.get(), true, nullptr);
-        _publish_runtime_filter_timer = ADD_TIMER_WITH_LEVEL(_profile, 
"PublishTime", 1);
-        _runtime_filter_compute_timer = ADD_TIMER_WITH_LEVEL(_profile, 
"BuildTime", 1);
-    }
+              _is_broadcast_join(is_broadcast_join) {}
 
 #ifdef BE_TEST
     RuntimeFilterProducerHelper() : _should_build_hash_table(true), 
_is_broadcast_join(false) {}
@@ -70,6 +66,8 @@ public:
     // publish rf
     Status publish(RuntimeState* state);
 
+    void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
+
 protected:
     virtual void _init_expr(const vectorized::VExprContextSPtrs& 
build_expr_ctxs,
                             const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs);
@@ -79,10 +77,14 @@ protected:
 
     std::vector<std::shared_ptr<RuntimeFilterProducer>> _producers;
     const bool _should_build_hash_table;
-    RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
-    RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
-    std::unique_ptr<RuntimeProfile> _profile;
-    bool _skip_runtime_filters_process = false;
+    std::unique_ptr<RuntimeProfile::Counter> _publish_runtime_filter_timer =
+            std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
+    std::unique_ptr<RuntimeProfile::Counter> _runtime_filter_compute_timer =
+            std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
+
+    // This flag is setted by skip_process
+    // and read by many methods, not sure wheather there exists data race, so 
i use atomic
+    std::atomic_bool _skip_runtime_filters_process = false;
     const bool _is_broadcast_join;
 
     std::vector<std::shared_ptr<vectorized::VExprContext>> 
_filter_expr_contexts;
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h 
b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
index dd629cef1f4..af80750524c 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h
@@ -33,8 +33,7 @@ class RuntimeFilterProducerHelperCross : public 
RuntimeFilterProducerHelper {
 public:
     ~RuntimeFilterProducerHelperCross() override = default;
 
-    RuntimeFilterProducerHelperCross(RuntimeProfile* profile)
-            : RuntimeFilterProducerHelper(profile, true, false) {}
+    RuntimeFilterProducerHelperCross() : RuntimeFilterProducerHelper(true, 
false) {}
 
     Status process(RuntimeState* state, vectorized::Blocks& blocks) {
         for (auto& block : blocks) {
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h 
b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
index 2e4e5bfe86a..0478d7b4c5c 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper_set.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
@@ -34,8 +34,7 @@ class RuntimeFilterProducerHelperSet : public 
RuntimeFilterProducerHelper {
 public:
     ~RuntimeFilterProducerHelperSet() override = default;
 
-    RuntimeFilterProducerHelperSet(RuntimeProfile* profile)
-            : RuntimeFilterProducerHelper(profile, true, false) {}
+    RuntimeFilterProducerHelperSet() : RuntimeFilterProducerHelper(true, 
false) {}
 
     Status process(RuntimeState* state, const vectorized::Block* block, 
uint64_t cardinality) {
         if (_skip_runtime_filters_process) {
diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
index 05806c6a9b6..6875b7d4116 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
@@ -88,7 +88,7 @@ TEST_F(RuntimeFilterConsumerHelperTest, basic) {
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
-            _query_ctx.get(), runtime_filter_descs.data(), &producer, 
&_profile));
+            _query_ctx.get(), runtime_filter_descs.data(), &producer));
     
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
     helper._consumers[0]->signal(producer.get());
 
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index 3dcc2064c1c..4d1338c2689 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -35,7 +35,7 @@ public:
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&producer, &_profile));
+                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&producer));
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
         
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
 
@@ -120,7 +120,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, 
&_profile));
+            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
     
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
 
     std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
@@ -156,7 +156,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, 
&_profile));
+            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
     
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED);
 
     std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
@@ -221,7 +221,7 @@ TEST_F(RuntimeFilterConsumerTest, 
aquire_signal_at_same_time) {
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&producer, &_profile));
+                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&producer));
         
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
 
         std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp 
b/be/test/runtime_filter/runtime_filter_merger_test.cpp
index 18b4766afc5..2c62c0de8b0 100644
--- a/be/test/runtime_filter/runtime_filter_merger_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp
@@ -41,7 +41,7 @@ public:
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+                _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer));
         producer->set_wrapper_state_and_ready_to_publish(first_product_state);
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
         ASSERT_FALSE(merger->ready());
@@ -49,7 +49,7 @@ public:
 
         std::shared_ptr<RuntimeFilterProducer> producer2;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+                _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2));
         
producer2->set_wrapper_state_and_ready_to_publish(second_product_state);
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get()));
         ASSERT_TRUE(merger->ready());
@@ -68,7 +68,7 @@ public:
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+                _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer));
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
         producer->set_wrapper_state_and_ready_to_publish(state);
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
@@ -80,8 +80,8 @@ public:
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->serialize(&request, &data, 
&len));
 
         std::shared_ptr<RuntimeFilterProducer> deserialized_producer;
-        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
-                _query_ctx.get(), &desc, &deserialized_producer, &_profile));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&deserialized_producer));
         butil::IOBuf buf;
         buf.append(data, len);
         butil::IOBufAsZeroCopyInputStream stream(buf);
@@ -124,14 +124,14 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) {
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer));
     
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // 
ready wrapper
     ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY);
 
     std::shared_ptr<RuntimeFilterProducer> producer2;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2));
     
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
     auto st = merger->merge_from(producer2.get());
     ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR);
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
index 54b2b673402..d8222e201d9 100644
--- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -75,35 +75,32 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
 
         std::shared_ptr<RuntimeFilterProducer> producer_filter;
         // producer_filter should not be nullptr
-        EXPECT_FALSE(global_runtime_filter_mgr
-                             
->register_local_merger_producer_filter(ctx.get(), desc,
-                                                                     
producer_filter, profile.get())
-                             .ok());
-        // local merge filter should not be registered in local mgr
-        EXPECT_FALSE(local_runtime_filter_mgr
-                             
->register_local_merger_producer_filter(ctx.get(), desc,
-                                                                     
producer_filter, profile.get())
-                             .ok());
-        // producer should not registered in global mgr
         EXPECT_FALSE(
                 global_runtime_filter_mgr
-                        ->register_producer_filter(ctx.get(), desc, 
&producer_filter, profile.get())
+                        ->register_local_merger_producer_filter(ctx.get(), 
desc, producer_filter)
                         .ok());
-        EXPECT_EQ(producer_filter, nullptr);
-        // Register in local mgr
-        EXPECT_TRUE(
+        // local merge filter should not be registered in local mgr
+        EXPECT_FALSE(
                 local_runtime_filter_mgr
-                        ->register_producer_filter(ctx.get(), desc, 
&producer_filter, profile.get())
+                        ->register_local_merger_producer_filter(ctx.get(), 
desc, producer_filter)
                         .ok());
+        // producer should not registered in global mgr
+        EXPECT_FALSE(global_runtime_filter_mgr
+                             ->register_producer_filter(ctx.get(), desc, 
&producer_filter)
+                             .ok());
+        EXPECT_EQ(producer_filter, nullptr);
+        // Register in local mgr
+        EXPECT_TRUE(local_runtime_filter_mgr
+                            ->register_producer_filter(ctx.get(), desc, 
&producer_filter)
+                            .ok());
         auto mocked_dependency = 
std::make_shared<pipeline::CountedFinishDependency>(
                 0, 0, "MOCKED_FINISH_DEPENDENCY");
         producer_filter->latch_dependency(mocked_dependency);
         EXPECT_NE(producer_filter, nullptr);
         // Register in local mgr twice
-        EXPECT_FALSE(
-                local_runtime_filter_mgr
-                        ->register_producer_filter(ctx.get(), desc, 
&producer_filter, profile.get())
-                        .ok());
+        EXPECT_FALSE(local_runtime_filter_mgr
+                             ->register_producer_filter(ctx.get(), desc, 
&producer_filter)
+                             .ok());
         EXPECT_NE(producer_filter, nullptr);
 
         LocalMergeContext* local_merge_filters = nullptr;
@@ -114,10 +111,10 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
                              ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
                              .ok());
         // Register local merge filter
-        EXPECT_TRUE(global_runtime_filter_mgr
-                            ->register_local_merger_producer_filter(ctx.get(), 
desc,
-                                                                    
producer_filter, profile.get())
-                            .ok());
+        EXPECT_TRUE(
+                global_runtime_filter_mgr
+                        ->register_local_merger_producer_filter(ctx.get(), 
desc, producer_filter)
+                        .ok());
         EXPECT_TRUE(global_runtime_filter_mgr
                             ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
                             .ok());
diff --git 
a/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
index 974642d9d74..53f8e05e81d 100644
--- a/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp
@@ -56,7 +56,7 @@ class RuntimeFilterProducerHelperCrossTest : public 
RuntimeFilterTest {
 };
 
 TEST_F(RuntimeFilterProducerHelperCrossTest, basic) {
-    auto helper = RuntimeFilterProducerHelperCross(&_profile);
+    auto helper = RuntimeFilterProducerHelperCross();
 
     vectorized::VExprContextSPtr ctx;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
index fffe752d9db..bf5988928dd 100644
--- a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
@@ -57,7 +57,7 @@ class RuntimeFilterProducerHelperTest : public 
RuntimeFilterTest {
 };
 
 TEST_F(RuntimeFilterProducerHelperTest, basic) {
-    auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+    auto helper = RuntimeFilterProducerHelper(true, false);
 
     vectorized::VExprContextSPtr ctx;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
@@ -82,7 +82,7 @@ TEST_F(RuntimeFilterProducerHelperTest, basic) {
 }
 
 TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
-    auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+    auto helper = RuntimeFilterProducerHelper(true, false);
 
     vectorized::VExprContextSPtr ctx;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
@@ -106,7 +106,7 @@ TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
 }
 
 TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
-    auto helper = RuntimeFilterProducerHelper(&_profile, true, false);
+    auto helper = RuntimeFilterProducerHelper(true, false);
 
     vectorized::VExprContextSPtr ctx;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
@@ -137,7 +137,7 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
 }
 
 TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
-    auto helper = RuntimeFilterProducerHelper(&_profile, true, true);
+    auto helper = RuntimeFilterProducerHelper(true, true);
 
     vectorized::VExprContextSPtr ctx;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
@@ -160,7 +160,7 @@ TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
             helper.build(_runtime_states[0].get(), &block, true, 
runtime_filters));
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.publish(_runtime_states[0].get()));
 
-    auto helper2 = RuntimeFilterProducerHelper(&_profile, false, true);
+    auto helper2 = RuntimeFilterProducerHelper(false, true);
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
             helper2.init(_runtime_states[1].get(), build_expr_ctxs, 
runtime_filter_descs));
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_test.cpp
index 549a2d8361c..b075247759a 100644
--- a/be/test/runtime_filter/runtime_filter_producer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp
@@ -31,7 +31,7 @@ TEST_F(RuntimeFilterProducerTest, basic) {
     std::shared_ptr<RuntimeFilterProducer> producer;
     auto desc = TRuntimeFilterDescBuilder().build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, 
&_profile));
+            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
 }
 
 TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) {
@@ -42,7 +42,7 @@ TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) {
                             .set_is_broadcast_join(true)
                             .build();
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&producer, &_profile));
+                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&producer));
         ASSERT_EQ(producer->_need_sync_filter_size, false);
         ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_DATA);
     }
@@ -53,7 +53,7 @@ TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) {
                             .set_is_broadcast_join(false)
                             .build();
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&producer, &_profile));
+                RuntimeFilterProducer::create(_query_ctx.get(), &desc, 
&producer));
         ASSERT_EQ(producer->_need_sync_filter_size, false);
         ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_DATA);
     }
@@ -66,7 +66,7 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size) {
                         .set_is_broadcast_join(false)
                         .build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, 
&_profile));
+            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
     ASSERT_EQ(producer->_need_sync_filter_size, true);
     ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
 
@@ -85,7 +85,7 @@ TEST_F(RuntimeFilterProducerTest, 
sync_filter_size_local_no_merge) {
                         .set_is_broadcast_join(false)
                         .build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, 
&_profile));
+            RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
     ASSERT_EQ(producer->_need_sync_filter_size, true);
     ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
 
@@ -106,10 +106,10 @@ TEST_F(RuntimeFilterProducerTest, 
sync_filter_size_local_merge) {
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer));
     std::shared_ptr<RuntimeFilterProducer> producer2;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2));
 
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
@@ -142,10 +142,10 @@ TEST_F(RuntimeFilterProducerTest, set_disable) {
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer));
     std::shared_ptr<RuntimeFilterProducer> producer2;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2));
 
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to