This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 39669c6df2 [feature](pipelineX) add runtimefliter in pipelineX
multicast sink (#25120)
39669c6df2 is described below
commit 39669c6df298df57c3924dace5263c39defc29ff
Author: Mryange <[email protected]>
AuthorDate: Tue Oct 10 10:41:08 2023 +0800
[feature](pipelineX) add runtimefliter in pipelineX multicast sink (#25120)
---
.../exec/multi_cast_data_stream_source.cpp | 10 +++++++++
.../pipeline/exec/multi_cast_data_stream_source.h | 26 ++++++++++++++++++----
2 files changed, 32 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index c70d87f59e..97f7d9e573 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -125,8 +125,16 @@ RuntimeProfile*
MultiCastDataStreamerSourceOperator::get_runtime_profile() const
return _multi_cast_data_streamer->profile();
}
+MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState*
state,
+
OperatorXBase* parent)
+ : Base(state, parent),
+ vectorized::RuntimeFilterConsumer(
+ static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
+ static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
+
Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
+ RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
@@ -134,6 +142,8 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state,
_output_expr_contexts[i]));
}
+ // init profile for runtime filter
+ RuntimeFilterConsumer::_init_profile(profile());
return Status::OK();
}
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 6377b5ef16..943c62d077 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -94,15 +94,21 @@ private:
class MultiCastDataStreamerSourceOperatorX;
-class MultiCastDataStreamSourceLocalState final : public
PipelineXLocalState<MultiCastDependency> {
+class MultiCastDataStreamSourceLocalState final : public
PipelineXLocalState<MultiCastDependency>,
+ public
vectorized::RuntimeFilterConsumer {
public:
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState);
using Base = PipelineXLocalState<MultiCastDependency>;
using Parent = MultiCastDataStreamerSourceOperatorX;
- MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase*
parent)
- : Base(state, parent) {};
-
+ MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase*
parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+ Status open(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::open(state));
+ RETURN_IF_ERROR(_acquire_runtime_filter());
+ return Status::OK();
+ }
+
friend class MultiCastDataStreamerSourceOperatorX;
private:
@@ -163,6 +169,18 @@ public:
bool is_source() const override { return true; }
+ const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
+ return _t_data_stream_sink.runtime_filters;
+ }
+
+ int dest_id_from_sink() const { return _t_data_stream_sink.dest_node_id; }
+
+ bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const
override {
+ return state->get_local_state(id())
+ ->template cast<MultiCastDataStreamSourceLocalState>()
+ .runtime_filters_are_ready_or_timeout();
+ }
+
private:
friend class MultiCastDataStreamSourceLocalState;
const int _consumer_id;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]