This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2c16fe0da9 [bugfix](runtimefilter) runtime filter is shared between
multi instances with same node id, should not cache exprs (#22114)
2c16fe0da9 is described below
commit 2c16fe0da9dbcef2fa431af0ae7e60d0a127de9e
Author: yiguolei <[email protected]>
AuthorDate: Sun Jul 23 13:04:33 2023 +0800
[bugfix](runtimefilter) runtime filter is shared between multi instances
with same node id, should not cache exprs (#22114)
runtime filter is shared among multi instances.
in the past, we cached pushdown expr(runtime filter generated)
every scannode[runtime filter consumer] will try to call prepare expr
but the expr may generated with different fn_context_id
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/exprs/runtime_filter.cpp | 28 +++++-----------------------
be/src/exprs/runtime_filter.h | 7 +------
be/src/vec/exec/runtime_filter_consumer.cpp | 6 +++---
3 files changed, 9 insertions(+), 32 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 6932ad9c6b..02f5e7b514 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1152,35 +1152,17 @@ Status IRuntimeFilter::publish() {
}
}
-Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>*
push_exprs) {
+Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>*
push_exprs,
+ bool is_late_arrival) {
DCHECK(is_consumer());
- if (!_is_ignored) {
- _set_push_down();
- _profile->add_info_string("Info", _format_status());
- return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx);
- } else {
- _profile->add_info_string("Info", _format_status());
- return Status::OK();
- }
-}
-
-Status IRuntimeFilter::get_prepared_exprs(std::vector<vectorized::VExprSPtr>*
vexprs,
- const RowDescriptor& desc,
RuntimeState* state) {
_profile->add_info_string("Info", _format_status());
if (_is_ignored) {
return Status::OK();
}
- DCHECK((!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY)
||
- (_enable_pipeline_exec &&
- _rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY));
- DCHECK(is_consumer());
- std::lock_guard guard(_inner_mutex);
-
- if (_push_down_vexprs.empty()) {
- RETURN_IF_ERROR(_wrapper->get_push_exprs(&_push_down_vexprs,
_vprobe_ctx));
+ if (!is_late_arrival) {
+ _set_push_down();
}
- vexprs->insert(vexprs->end(), _push_down_vexprs.begin(),
_push_down_vexprs.end());
- return Status::OK();
+ return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx);
}
bool IRuntimeFilter::await() {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index a4fd241a28..fb5e43d177 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -221,10 +221,7 @@ public:
RuntimeFilterType type() const { return _runtime_filter_type; }
- Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs);
-
- Status get_prepared_exprs(std::vector<doris::vectorized::VExprSPtr>*
push_exprs,
- const RowDescriptor& desc, RuntimeState* state);
+ Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs,
bool is_late_arrival);
bool is_broadcast_join() const { return _is_broadcast_join; }
@@ -385,8 +382,6 @@ protected:
bool _is_ignored;
std::string _ignored_msg;
- std::vector<doris::vectorized::VExprSPtr> _push_down_vexprs;
-
struct RPCContext;
std::shared_ptr<RPCContext> _rpc_context;
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index b05ebf0476..2af841749b 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -95,7 +95,7 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() {
ready = runtime_filter->await();
}
if (ready && !_runtime_filter_ctxs[i].apply_mark) {
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
+ RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs,
false));
_runtime_filter_ctxs[i].apply_mark = true;
} else if (runtime_filter->current_state() ==
RuntimeFilterState::NOT_READY &&
!_runtime_filter_ctxs[i].apply_mark) {
@@ -151,8 +151,8 @@ Status
RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive
++current_arrived_rf_num;
continue;
} else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
-
RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
- &exprs, _row_descriptor_ref, _state));
+ RETURN_IF_ERROR(
+
_runtime_filter_ctxs[i].runtime_filter->get_push_expr_ctxs(&exprs, true));
++current_arrived_rf_num;
_runtime_filter_ctxs[i].apply_mark = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]