This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ff4d1e7b24e [chore](profile) Minor refactor on runtime filter producer profile (#50055) ff4d1e7b24e is described below commit ff4d1e7b24e7f701c7ee6e946978ef9f782350c4 Author: zhiqiang <hezhiqi...@selectdb.com> AuthorDate: Wed Apr 16 10:14:40 2025 +0800 [chore](profile) Minor refactor on runtime filter producer profile (#50055) ### What problem does this PR solve? Do same thing from https://github.com/apache/doris/pull/49777 to RuntimeFilterProducer In execution profile: ```text - RuntimeFilterInfo: - BuildTime: 392.464us - PublishTime: 69.942us - RF4 Info: Producer: ([id: 4, state: [READY], type: MINMAX_FILTER, column_type: INT], mode: LOCAL, state: PUBLISHED) - RF5 Info: Producer: ([id: 5, state: [READY], type: IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576, build_bf_by_runtime_size: true], mode: LOCAL, state: PUBLISHED) - SkipProcess: False ``` In merged profile ``` - RuntimeFilterInfo: sum , avg , max , min - BuildTime: avg 0ns, max 0ns, min 0ns - PublishTime: avg 103.959us, max 103.959us, min 103.959us ``` --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 5 ++- .../exec/nested_loop_join_build_operator.cpp | 3 +- be/src/pipeline/exec/set_sink_operator.cpp | 7 +++- be/src/runtime/runtime_state.cpp | 9 ++--- be/src/runtime/runtime_state.h | 6 +-- be/src/runtime_filter/runtime_filter_mgr.cpp | 14 +++---- be/src/runtime_filter/runtime_filter_mgr.h | 6 +-- be/src/runtime_filter/runtime_filter_producer.h | 36 ++++++++++-------- .../runtime_filter_producer_helper.cpp | 34 ++++++++++++++--- .../runtime_filter_producer_helper.h | 26 +++++++------ .../runtime_filter_producer_helper_cross.h | 3 +- .../runtime_filter_producer_helper_set.h | 3 +- .../runtime_filter_consumer_helper_test.cpp | 2 +- .../runtime_filter_consumer_test.cpp | 8 ++-- .../runtime_filter/runtime_filter_merger_test.cpp | 14 +++---- be/test/runtime_filter/runtime_filter_mgr_test.cpp | 43 ++++++++++------------ .../runtime_filter_producer_helper_cross_test.cpp | 2 +- .../runtime_filter_producer_helper_test.cpp | 10 ++--- .../runtime_filter_producer_test.cpp | 18 ++++----- 19 files changed, 141 insertions(+), 108 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index b9ca1c6f6bd..bfc283470bf 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -88,7 +88,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo // Hash Table Init RETURN_IF_ERROR(_hash_table_init(state)); _runtime_filter_producer_helper = std::make_shared<RuntimeFilterProducerHelper>( - profile(), _should_build_hash_table, p._is_broadcast_join); + _should_build_hash_table, p._is_broadcast_join); RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _build_expr_ctxs, p._runtime_filter_descs)); return Status::OK(); @@ -250,6 +250,9 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu e.to_string(), _terminated, _should_build_hash_table, _finish_dependency->debug_string(), blocked_by_shared_hash_table_signal); } + if (_runtime_filter_producer_helper) { + _runtime_filter_producer_helper->collect_realtime_profile(profile()); + } return Base::close(state, exec_status); } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 7b8647f2232..fd44242cb68 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -41,7 +41,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, _filter_src_expr_ctxs[i])); } - _runtime_filter_producer_helper = std::make_shared<RuntimeFilterProducerHelperCross>(profile()); + _runtime_filter_producer_helper = std::make_shared<RuntimeFilterProducerHelperCross>(); RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _filter_src_expr_ctxs, p._runtime_filter_descs)); return Status::OK(); @@ -56,6 +56,7 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) { Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { RETURN_IF_ERROR(_runtime_filter_producer_helper->process(state, _shared_state->build_blocks)); + _runtime_filter_producer_helper->collect_realtime_profile(profile()); RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status)); return Status::OK(); } diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 0d8f4a45eb7..6c5b4483915 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -66,6 +66,11 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState* state, Status exec_s e.to_string(), _terminated, _finish_dependency->debug_string()); } } + + if (_runtime_filter_producer_helper) { + _runtime_filter_producer_helper->collect_realtime_profile(profile()); + } + return Base::close(state, exec_status); } @@ -209,7 +214,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkState RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs)); - _runtime_filter_producer_helper = std::make_shared<RuntimeFilterProducerHelperSet>(profile()); + _runtime_filter_producer_helper = std::make_shared<RuntimeFilterProducerHelperSet>(); RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs, parent._runtime_filter_descs)); return Status::OK(); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index b8dd7f560e9..2389883afb2 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -495,14 +495,13 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() { } Status RuntimeState::register_producer_runtime_filter( - const TRuntimeFilterDesc& desc, std::shared_ptr<RuntimeFilterProducer>* producer_filter, - RuntimeProfile* parent_profile) { + const TRuntimeFilterDesc& desc, std::shared_ptr<RuntimeFilterProducer>* producer_filter) { // Producers are created by local runtime filter mgr and shared by global runtime filter manager. // When RF is published, consumers in both global and local RF mgr will be found. - RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter( - _query_ctx, desc, producer_filter, parent_profile)); + RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(_query_ctx, desc, + producer_filter)); RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter( - _query_ctx, desc, *producer_filter, &_profile)); + _query_ctx, desc, *producer_filter)); return Status::OK(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 4037c22d31a..e4ecf59563c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -561,9 +561,9 @@ public: return _task_execution_context; } - Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc, - std::shared_ptr<RuntimeFilterProducer>* producer_filter, - RuntimeProfile* parent_profile); + Status register_producer_runtime_filter( + const doris::TRuntimeFilterDesc& desc, + std::shared_ptr<RuntimeFilterProducer>* producer_filter); Status register_consumer_runtime_filter( const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index c18a448f1b1..a2072b6771b 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -74,7 +74,7 @@ Status RuntimeFilterMgr::register_consumer_filter( Status RuntimeFilterMgr::register_local_merger_producer_filter( const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, - std::shared_ptr<RuntimeFilterProducer> producer, RuntimeProfile* parent_profile) { + std::shared_ptr<RuntimeFilterProducer> producer) { if (!_is_global) [[unlikely]] { return Status::InternalError( "A local merge filter can not be registered in Local RuntimeFilterMgr"); @@ -129,10 +129,9 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id, return Status::OK(); } -Status RuntimeFilterMgr::register_producer_filter(const QueryContext* query_ctx, - const TRuntimeFilterDesc& desc, - std::shared_ptr<RuntimeFilterProducer>* producer, - RuntimeProfile* parent_profile) { +Status RuntimeFilterMgr::register_producer_filter( + const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, + std::shared_ptr<RuntimeFilterProducer>* producer) { if (_is_global) [[unlikely]] { return Status::InternalError( "A local producer filter should not be registered in Global RuntimeFilterMgr"); @@ -144,7 +143,7 @@ Status RuntimeFilterMgr::register_producer_filter(const QueryContext* query_ctx, if (_producer_id_set.contains(key)) { return Status::InvalidArgument("filter {} has been registered", key); } - RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer, parent_profile)); + RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer)); _producer_id_set.insert(key); return Status::OK(); } @@ -312,7 +311,7 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q } std::shared_ptr<RuntimeFilterProducer> tmp_filter; RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx.get(), &cnt_val.runtime_filter_desc, - &tmp_filter, nullptr)); + &tmp_filter)); RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data)); @@ -347,6 +346,7 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q auto ctx = query_ctx->ignore_runtime_filter_error() ? std::weak_ptr<QueryContext> {} : query_ctx; std::vector<TRuntimeFilterTargetParamsV2>& targets = cnt_val.targetv2_info; + for (auto& target : targets) { auto closure = AutoReleaseClosure<PPublishFilterRequestV2, DummyBrpcCallback<PPublishFilterResponse>>:: diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 87e471a745b..00a91d61a2b 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -87,15 +87,13 @@ public: Status register_local_merger_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, - std::shared_ptr<RuntimeFilterProducer> producer, - RuntimeProfile* parent_profile); + std::shared_ptr<RuntimeFilterProducer> producer); Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters); // Create local producer. This producer is hold by RuntimeFilterProducerHelper. Status register_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, - std::shared_ptr<RuntimeFilterProducer>* producer, - RuntimeProfile* parent_profile); + std::shared_ptr<RuntimeFilterProducer>* producer); // update filter by remote bool set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index d688e29fe00..620262f6051 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -22,6 +22,7 @@ #include "pipeline/dependency.h" #include "runtime/query_context.h" #include "runtime_filter/runtime_filter.h" +#include "util/runtime_profile.h" namespace doris { #include "common/compile_check_begin.h" @@ -46,10 +47,8 @@ public: }; static Status create(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, - std::shared_ptr<RuntimeFilterProducer>* res, - RuntimeProfile* parent_profile) { - *res = std::shared_ptr<RuntimeFilterProducer>( - new RuntimeFilterProducer(query_ctx, desc, parent_profile)); + std::shared_ptr<RuntimeFilterProducer>* res) { + *res = std::shared_ptr<RuntimeFilterProducer>(new RuntimeFilterProducer(query_ctx, desc)); RETURN_IF_ERROR((*res)->_init_with_desc(desc, &query_ctx->query_options())); return Status::OK(); } @@ -117,24 +116,33 @@ public: return false; } _rf_state = state; - _profile->add_info_string("Info", debug_string()); return true; } std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; } void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper = wrapper; } -private: - RuntimeFilterProducer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, - RuntimeProfile* parent_profile) - : RuntimeFilter(desc), - _is_broadcast_join(desc->is_broadcast_join), - _profile(new RuntimeProfile(fmt::format("RF{}", desc->filter_id))) { - if (parent_profile) { //tmp filter for mgr has no profile - parent_profile->add_child(_profile.get(), true, nullptr); + void collect_realtime_profile(RuntimeProfile* parent_operator_profile) { + DCHECK(parent_operator_profile != nullptr); + if (parent_operator_profile == nullptr) { + return; + } + /* + RuntimeFilterInfo: + - RF0 Info: xxxx + */ + { + std::unique_lock<std::mutex> l(_mtx); + parent_operator_profile->add_description( + fmt::format("RF{} Info", _wrapper->filter_id()), debug_string(), + "RuntimeFilterInfo"); } } +private: + RuntimeFilterProducer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc) + : RuntimeFilter(desc), _is_broadcast_join(desc->is_broadcast_join) {} + Status _send_to_remote_targets(RuntimeState* state, RuntimeFilter* merger_filter); Status _send_to_local_targets(RuntimeState* state, RuntimeFilter* merger_filter, bool global); @@ -150,7 +158,6 @@ private: RETURN_IF_ERROR(RuntimeFilter::_init_with_desc(desc, options)); _need_sync_filter_size = _wrapper->build_bf_by_runtime_size() && !_is_broadcast_join; _rf_state = _need_sync_filter_size ? State::WAITING_FOR_SEND_SIZE : State::WAITING_FOR_DATA; - _profile->add_info_string("Info", debug_string()); return Status::OK(); } @@ -161,7 +168,6 @@ private: std::shared_ptr<pipeline::CountedFinishDependency> _dependency; std::atomic<State> _rf_state; - std::unique_ptr<RuntimeProfile> _profile; // only used to lock set_state() to make _rf_state is protected // set_synced_size and RuntimeFilterProducerHelper::terminate may called in different threads at the same time diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp b/be/src/runtime_filter/runtime_filter_producer_helper.cpp index 9b3d5b1acdb..435585da3d3 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp +++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp @@ -17,6 +17,8 @@ #include "runtime_filter/runtime_filter_producer_helper.h" +#include <gen_cpp/Metrics_types.h> + #include "pipeline/pipeline_task.h" #include "runtime_filter/runtime_filter_wrapper.h" @@ -36,8 +38,8 @@ Status RuntimeFilterProducerHelper::init( const std::vector<TRuntimeFilterDesc>& runtime_filter_descs) { _producers.resize(runtime_filter_descs.size()); for (size_t i = 0; i < runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->register_producer_runtime_filter(runtime_filter_descs[i], - &_producers[i], _profile.get())); + RETURN_IF_ERROR( + state->register_producer_runtime_filter(runtime_filter_descs[i], &_producers[i])); } _init_expr(build_expr_ctxs, runtime_filter_descs); return Status::OK(); @@ -68,7 +70,7 @@ Status RuntimeFilterProducerHelper::_init_filters(RuntimeState* state, } Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size_t start) { - SCOPED_TIMER(_runtime_filter_compute_timer); + SCOPED_TIMER(_runtime_filter_compute_timer.get()); for (int i = 0; i < _producers.size(); i++) { auto filter = _producers[i]; int result_column_id = _filter_expr_contexts[i]->get_last_result_column_id(); @@ -80,7 +82,7 @@ Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size } Status RuntimeFilterProducerHelper::_publish(RuntimeState* state) { - SCOPED_TIMER(_publish_runtime_filter_timer); + SCOPED_TIMER(_publish_runtime_filter_timer.get()); for (const auto& filter : _producers) { RETURN_IF_ERROR(filter->publish(state, _should_build_hash_table)); } @@ -153,8 +155,30 @@ Status RuntimeFilterProducerHelper::skip_process(RuntimeState* state) { RETURN_IF_ERROR(publish(state)); _skip_runtime_filters_process = true; - _profile->add_info_string("SkipProcess", "True"); return Status::OK(); } +void RuntimeFilterProducerHelper::collect_realtime_profile( + RuntimeProfile* parent_operator_profile) { + DCHECK(parent_operator_profile != nullptr); + if (parent_operator_profile == nullptr) { + return; + } + + parent_operator_profile->add_counter_with_level("RuntimeFilterInfo", TUnit::NONE, 1); + RuntimeProfile::Counter* publish_timer = parent_operator_profile->add_counter( + "PublishTime", TUnit::TIME_NS, "RuntimeFilterInfo", 1); + RuntimeProfile::Counter* build_timer = parent_operator_profile->add_counter( + "BuildTime", TUnit::TIME_NS, "RuntimeFilterInfo", 1); + + parent_operator_profile->add_description( + "SkipProcess", _skip_runtime_filters_process ? "True" : "False", "RuntimeFilterInfo"); + publish_timer->set(_publish_runtime_filter_timer->value()); + build_timer->set(_runtime_filter_compute_timer->value()); + + for (auto& producer : _producers) { + producer->collect_realtime_profile(parent_operator_profile); + } +} + } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h b/be/src/runtime_filter/runtime_filter_producer_helper.h index d3a6f23bd10..a802e9b3ba7 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper.h @@ -17,6 +17,8 @@ #pragma once +#include <gen_cpp/Metrics_types.h> + #include "common/be_mock_util.h" #include "common/status.h" #include "runtime/runtime_state.h" @@ -36,15 +38,9 @@ class RuntimeFilterProducerHelper { public: virtual ~RuntimeFilterProducerHelper() = default; - RuntimeFilterProducerHelper(RuntimeProfile* profile, bool should_build_hash_table, - bool is_broadcast_join) + RuntimeFilterProducerHelper(bool should_build_hash_table, bool is_broadcast_join) : _should_build_hash_table(should_build_hash_table), - _profile(new RuntimeProfile("RuntimeFilterProducerHelper")), - _is_broadcast_join(is_broadcast_join) { - profile->add_child(_profile.get(), true, nullptr); - _publish_runtime_filter_timer = ADD_TIMER_WITH_LEVEL(_profile, "PublishTime", 1); - _runtime_filter_compute_timer = ADD_TIMER_WITH_LEVEL(_profile, "BuildTime", 1); - } + _is_broadcast_join(is_broadcast_join) {} #ifdef BE_TEST RuntimeFilterProducerHelper() : _should_build_hash_table(true), _is_broadcast_join(false) {} @@ -70,6 +66,8 @@ public: // publish rf Status publish(RuntimeState* state); + void collect_realtime_profile(RuntimeProfile* parent_operator_profile); + protected: virtual void _init_expr(const vectorized::VExprContextSPtrs& build_expr_ctxs, const std::vector<TRuntimeFilterDesc>& runtime_filter_descs); @@ -79,10 +77,14 @@ protected: std::vector<std::shared_ptr<RuntimeFilterProducer>> _producers; const bool _should_build_hash_table; - RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; - RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; - std::unique_ptr<RuntimeProfile> _profile; - bool _skip_runtime_filters_process = false; + std::unique_ptr<RuntimeProfile::Counter> _publish_runtime_filter_timer = + std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0); + std::unique_ptr<RuntimeProfile::Counter> _runtime_filter_compute_timer = + std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0); + + // This flag is setted by skip_process + // and read by many methods, not sure wheather there exists data race, so i use atomic + std::atomic_bool _skip_runtime_filters_process = false; const bool _is_broadcast_join; std::vector<std::shared_ptr<vectorized::VExprContext>> _filter_expr_contexts; diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h index dd629cef1f4..af80750524c 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper_cross.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper_cross.h @@ -33,8 +33,7 @@ class RuntimeFilterProducerHelperCross : public RuntimeFilterProducerHelper { public: ~RuntimeFilterProducerHelperCross() override = default; - RuntimeFilterProducerHelperCross(RuntimeProfile* profile) - : RuntimeFilterProducerHelper(profile, true, false) {} + RuntimeFilterProducerHelperCross() : RuntimeFilterProducerHelper(true, false) {} Status process(RuntimeState* state, vectorized::Blocks& blocks) { for (auto& block : blocks) { diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h b/be/src/runtime_filter/runtime_filter_producer_helper_set.h index 2e4e5bfe86a..0478d7b4c5c 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper_set.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h @@ -34,8 +34,7 @@ class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper { public: ~RuntimeFilterProducerHelperSet() override = default; - RuntimeFilterProducerHelperSet(RuntimeProfile* profile) - : RuntimeFilterProducerHelper(profile, true, false) {} + RuntimeFilterProducerHelperSet() : RuntimeFilterProducerHelper(true, false) {} Status process(RuntimeState* state, const vectorized::Block* block, uint64_t cardinality) { if (_skip_runtime_filters_process) { diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp index 05806c6a9b6..6875b7d4116 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp @@ -88,7 +88,7 @@ TEST_F(RuntimeFilterConsumerHelperTest, basic) { std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( - _query_ctx.get(), runtime_filter_descs.data(), &producer, &_profile)); + _query_ctx.get(), runtime_filter_descs.data(), &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); helper._consumers[0]->signal(producer.get()); diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index 3dcc2064c1c..4d1338c2689 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -35,7 +35,7 @@ public: std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); @@ -120,7 +120,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) { std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); std::vector<vectorized::VRuntimeFilterPtr> push_exprs; @@ -156,7 +156,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) { std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED); std::vector<vectorized::VRuntimeFilterPtr> push_exprs; @@ -221,7 +221,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); std::vector<vectorized::VRuntimeFilterPtr> push_exprs; diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp b/be/test/runtime_filter/runtime_filter_merger_test.cpp index 18b4766afc5..2c62c0de8b0 100644 --- a/be/test/runtime_filter/runtime_filter_merger_test.cpp +++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp @@ -41,7 +41,7 @@ public: std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(first_product_state); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); ASSERT_FALSE(merger->ready()); @@ -49,7 +49,7 @@ public: std::shared_ptr<RuntimeFilterProducer> producer2; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); producer2->set_wrapper_state_and_ready_to_publish(second_product_state); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get())); ASSERT_TRUE(merger->ready()); @@ -68,7 +68,7 @@ public: std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123)); producer->set_wrapper_state_and_ready_to_publish(state); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); @@ -80,8 +80,8 @@ public: FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->serialize(&request, &data, &len)); std::shared_ptr<RuntimeFilterProducer> deserialized_producer; - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( - _query_ctx.get(), &desc, &deserialized_producer, &_profile)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &deserialized_producer)); butil::IOBuf buf; buf.append(data, len); butil::IOBufAsZeroCopyInputStream stream(buf); @@ -124,14 +124,14 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) { std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // ready wrapper ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY); std::shared_ptr<RuntimeFilterProducer> producer2; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); auto st = merger->merge_from(producer2.get()); ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR); diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/runtime_filter/runtime_filter_mgr_test.cpp index 54b2b673402..d8222e201d9 100644 --- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp @@ -75,35 +75,32 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { std::shared_ptr<RuntimeFilterProducer> producer_filter; // producer_filter should not be nullptr - EXPECT_FALSE(global_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, - producer_filter, profile.get()) - .ok()); - // local merge filter should not be registered in local mgr - EXPECT_FALSE(local_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, - producer_filter, profile.get()) - .ok()); - // producer should not registered in global mgr EXPECT_FALSE( global_runtime_filter_mgr - ->register_producer_filter(ctx.get(), desc, &producer_filter, profile.get()) + ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) .ok()); - EXPECT_EQ(producer_filter, nullptr); - // Register in local mgr - EXPECT_TRUE( + // local merge filter should not be registered in local mgr + EXPECT_FALSE( local_runtime_filter_mgr - ->register_producer_filter(ctx.get(), desc, &producer_filter, profile.get()) + ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) .ok()); + // producer should not registered in global mgr + EXPECT_FALSE(global_runtime_filter_mgr + ->register_producer_filter(ctx.get(), desc, &producer_filter) + .ok()); + EXPECT_EQ(producer_filter, nullptr); + // Register in local mgr + EXPECT_TRUE(local_runtime_filter_mgr + ->register_producer_filter(ctx.get(), desc, &producer_filter) + .ok()); auto mocked_dependency = std::make_shared<pipeline::CountedFinishDependency>( 0, 0, "MOCKED_FINISH_DEPENDENCY"); producer_filter->latch_dependency(mocked_dependency); EXPECT_NE(producer_filter, nullptr); // Register in local mgr twice - EXPECT_FALSE( - local_runtime_filter_mgr - ->register_producer_filter(ctx.get(), desc, &producer_filter, profile.get()) - .ok()); + EXPECT_FALSE(local_runtime_filter_mgr + ->register_producer_filter(ctx.get(), desc, &producer_filter) + .ok()); EXPECT_NE(producer_filter, nullptr); LocalMergeContext* local_merge_filters = nullptr; @@ -114,10 +111,10 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { ->get_local_merge_producer_filters(filter_id, &local_merge_filters) .ok()); // Register local merge filter - EXPECT_TRUE(global_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, - producer_filter, profile.get()) - .ok()); + EXPECT_TRUE( + global_runtime_filter_mgr + ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + .ok()); EXPECT_TRUE(global_runtime_filter_mgr ->get_local_merge_producer_filters(filter_id, &local_merge_filters) .ok()); diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp index 974642d9d74..53f8e05e81d 100644 --- a/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_helper_cross_test.cpp @@ -56,7 +56,7 @@ class RuntimeFilterProducerHelperCrossTest : public RuntimeFilterTest { }; TEST_F(RuntimeFilterProducerHelperCrossTest, basic) { - auto helper = RuntimeFilterProducerHelperCross(&_profile); + auto helper = RuntimeFilterProducerHelperCross(); vectorized::VExprContextSPtr ctx; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp index fffe752d9db..bf5988928dd 100644 --- a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp @@ -57,7 +57,7 @@ class RuntimeFilterProducerHelperTest : public RuntimeFilterTest { }; TEST_F(RuntimeFilterProducerHelperTest, basic) { - auto helper = RuntimeFilterProducerHelper(&_profile, true, false); + auto helper = RuntimeFilterProducerHelper(true, false); vectorized::VExprContextSPtr ctx; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( @@ -82,7 +82,7 @@ TEST_F(RuntimeFilterProducerHelperTest, basic) { } TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) { - auto helper = RuntimeFilterProducerHelper(&_profile, true, false); + auto helper = RuntimeFilterProducerHelper(true, false); vectorized::VExprContextSPtr ctx; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( @@ -106,7 +106,7 @@ TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) { } TEST_F(RuntimeFilterProducerHelperTest, skip_process) { - auto helper = RuntimeFilterProducerHelper(&_profile, true, false); + auto helper = RuntimeFilterProducerHelper(true, false); vectorized::VExprContextSPtr ctx; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( @@ -137,7 +137,7 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) { } TEST_F(RuntimeFilterProducerHelperTest, broadcast) { - auto helper = RuntimeFilterProducerHelper(&_profile, true, true); + auto helper = RuntimeFilterProducerHelper(true, true); vectorized::VExprContextSPtr ctx; FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( @@ -160,7 +160,7 @@ TEST_F(RuntimeFilterProducerHelperTest, broadcast) { helper.build(_runtime_states[0].get(), &block, true, runtime_filters)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.publish(_runtime_states[0].get())); - auto helper2 = RuntimeFilterProducerHelper(&_profile, false, true); + auto helper2 = RuntimeFilterProducerHelper(false, true); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( helper2.init(_runtime_states[1].get(), build_expr_ctxs, runtime_filter_descs)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp b/be/test/runtime_filter/runtime_filter_producer_test.cpp index 549a2d8361c..b075247759a 100644 --- a/be/test/runtime_filter/runtime_filter_producer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp @@ -31,7 +31,7 @@ TEST_F(RuntimeFilterProducerTest, basic) { std::shared_ptr<RuntimeFilterProducer> producer; auto desc = TRuntimeFilterDescBuilder().build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); } TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) { @@ -42,7 +42,7 @@ TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) { .set_is_broadcast_join(true) .build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); ASSERT_EQ(producer->_need_sync_filter_size, false); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); } @@ -53,7 +53,7 @@ TEST_F(RuntimeFilterProducerTest, no_sync_filter_size) { .set_is_broadcast_join(false) .build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); ASSERT_EQ(producer->_need_sync_filter_size, false); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); } @@ -66,7 +66,7 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size) { .set_is_broadcast_join(false) .build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); ASSERT_EQ(producer->_need_sync_filter_size, true); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); @@ -85,7 +85,7 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_no_merge) { .set_is_broadcast_join(false) .build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer, &_profile)); + RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer)); ASSERT_EQ(producer->_need_sync_filter_size, true); ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); @@ -106,10 +106,10 @@ TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_merge) { std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); std::shared_ptr<RuntimeFilterProducer> producer2; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); std::shared_ptr<RuntimeFilterConsumer> consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( @@ -142,10 +142,10 @@ TEST_F(RuntimeFilterProducerTest, set_disable) { std::shared_ptr<RuntimeFilterProducer> producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); std::shared_ptr<RuntimeFilterProducer> producer2; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( - _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); std::shared_ptr<RuntimeFilterConsumer> consumer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org