github-actions[bot] commented on code in PR #31067:
URL: https://github.com/apache/doris/pull/31067#discussion_r1494114707
##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -89,28 +89,72 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
}
}
- // TODO: union the remote opt and global two case as one case to one judge
- bool remote_opt_or_global = (desc.__isset.opt_remote_rf &&
desc.opt_remote_rf) || is_global;
-
if (!has_exist) {
IRuntimeFilter* filter;
- RETURN_IF_ERROR(IRuntimeFilter::create(
- _state, remote_opt_or_global ? _state->obj_pool() : &_pool,
&desc, &options,
- RuntimeFilterRole::CONSUMER, node_id, &filter,
build_bf_exactly, is_global));
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+ RuntimeFilterRole::CONSUMER,
node_id, &filter,
+ build_bf_exactly,
need_local_merge));
_consumer_map[key].emplace_back(node_id, filter);
*consumer_filter = filter;
- } else if (!remote_opt_or_global) {
+ } else if (!need_local_merge) {
return Status::InvalidArgument("filter has registered");
}
return Status::OK();
}
+Status RuntimeFilterMgr::register_local_merge_producer_filter(
+ const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions&
options,
+ doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) {
+ SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
+ int32_t key = desc.filter_id;
+
+ decltype(_local_merge_producer_map.end()) iter;
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ iter = _local_merge_producer_map.find(key);
+ if (iter == _local_merge_producer_map.end()) {
+ auto [new_iter, _] = _local_merge_producer_map.emplace(key,
LocalMergeFilters {});
+ iter = new_iter;
+ }
+ }
+
+ DCHECK(_state != nullptr);
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+ RuntimeFilterRole::PRODUCER, -1,
producer_filter,
+ build_bf_exactly, true));
+ {
+ std::lock_guard<std::mutex> l(*iter->second.lock);
+ if (iter->second.filters.empty()) {
+ IRuntimeFilter* merge_filter = nullptr;
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc,
&options,
+
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
+ build_bf_exactly, true));
+ iter->second.filters.emplace_back(merge_filter);
+ }
+ iter->second.merge_time++;
+ iter->second.filters.emplace_back(*producer_filter);
+ }
+ return Status::OK();
+}
+
+Status RuntimeFilterMgr::get_local_merge_producer_filters(
Review Comment:
warning: method 'get_local_merge_producer_filters' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/runtime_filter_mgr.h:91:
```diff
- Status get_local_merge_producer_filters(int filter_id,
LocalMergeFilters** local_merge_filters);
+ static Status get_local_merge_producer_filters(int filter_id,
LocalMergeFilters** local_merge_filters);
```
##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -954,33 +954,34 @@ void IRuntimeFilter::insert_batch(const
vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}
-Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper,
int* merged_num) {
- SCOPED_TIMER(_merge_local_rf_timer);
- std::unique_lock lock(_local_merge_mutex);
- if (_merged_rf_num == 0) {
- _wrapper = wrapper;
- } else {
- RETURN_IF_ERROR(merge_from(wrapper));
- }
- *merged_num = ++_merged_rf_num;
- return Status::OK();
-}
-
Status IRuntimeFilter::publish(bool publish_local) {
Review Comment:
warning: function 'publish' has cognitive complexity of 62 (threshold 50)
[readability-function-cognitive-complexity]
```cpp
Status IRuntimeFilter::publish(bool publish_local) {
^
```
<details>
<summary>Additional context</summary>
**be/src/exprs/runtime_filter.cpp:958:** nesting level increased to 1
```cpp
auto send_to_remote = [&](IRuntimeFilter* filter) {
^
```
**be/src/exprs/runtime_filter.cpp:961:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:961:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:964:** nesting level increased to 1
```cpp
auto do_local_merge = [&]() {
^
```
**be/src/exprs/runtime_filter.cpp:966:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:966:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:969:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:969:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:971:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
if (local_merge_filters->merge_time == 0) {
^
```
**be/src/exprs/runtime_filter.cpp:972:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
if (_has_local_target) {
^
```
**be/src/exprs/runtime_filter.cpp:975:** +1, nesting level increased to 3
```cpp
} else {
^
```
**be/src/exprs/runtime_filter.cpp:976:** +4, including nesting penalty of 3,
nesting level increased to 4
```cpp
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:976:** +5, including nesting penalty of 4,
nesting level increased to 5
```cpp
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:982:** +1, including nesting penalty of 0,
nesting level increased to 1
```cpp
if (_need_local_merge && _has_local_target) {
^
```
**be/src/exprs/runtime_filter.cpp:982:** +1
```cpp
if (_need_local_merge && _has_local_target) {
^
```
**be/src/exprs/runtime_filter.cpp:983:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(do_local_merge());
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:983:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(do_local_merge());
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:984:** +1, nesting level increased to 1
```cpp
} else if (_has_local_target) {
^
```
**be/src/exprs/runtime_filter.cpp:986:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id,
filters));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:986:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id,
filters));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:993:** +1, nesting level increased to 1
```cpp
} else if (!publish_local) {
^
```
**be/src/exprs/runtime_filter.cpp:994:** +2, including nesting penalty of 1,
nesting level increased to 2
```cpp
if (_is_broadcast_join) {
^
```
**be/src/exprs/runtime_filter.cpp:995:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(send_to_remote(this));
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:995:** +4, including nesting penalty of 3,
nesting level increased to 4
```cpp
RETURN_IF_ERROR(send_to_remote(this));
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:996:** +1, nesting level increased to 2
```cpp
} else {
^
```
**be/src/exprs/runtime_filter.cpp:997:** +3, including nesting penalty of 2,
nesting level increased to 3
```cpp
RETURN_IF_ERROR(do_local_merge());
^
```
**be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
```cpp
do { \
^
```
**be/src/exprs/runtime_filter.cpp:997:** +4, including nesting penalty of 3,
nesting level increased to 4
```cpp
RETURN_IF_ERROR(do_local_merge());
^
```
**be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
```cpp
if (UNLIKELY(!_status_.ok())) { \
^
```
**be/src/exprs/runtime_filter.cpp:999:** +1, nesting level increased to 1
```cpp
} else {
^
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]