This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9d839489872 [feature](pipelineX) Use dependency instead of block queue
in the runtime filter (#26078)
9d839489872 is described below
commit 9d8394898722953284ffe69e6b575bea09940815
Author: Mryange <[email protected]>
AuthorDate: Tue Oct 31 22:44:18 2023 +0800
[feature](pipelineX) Use dependency instead of block queue in the runtime
filter (#26078)
---
be/src/exprs/runtime_filter.cpp | 11 +-
be/src/exprs/runtime_filter.h | 17 ++++
.../exec/multi_cast_data_stream_source.cpp | 3 +-
be/src/pipeline/exec/scan_operator.cpp | 3 +-
be/src/pipeline/pipeline_x/dependency.cpp | 113 +++++++++++++++++++++
be/src/pipeline/pipeline_x/dependency.h | 61 +++++++++--
be/src/pipeline/pipeline_x/operator.cpp | 2 +-
be/src/pipeline/pipeline_x/operator.h | 4 +-
be/src/pipeline/pipeline_x/pipeline_x_task.h | 2 +-
be/src/vec/exec/runtime_filter_consumer.cpp | 21 +++-
be/src/vec/exec/runtime_filter_consumer.h | 5 +-
11 files changed, 216 insertions(+), 26 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c6e64fd0e55..10c2856112b 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -42,6 +42,7 @@
#include "exprs/hybrid_set.h"
#include "exprs/minmax_predicate.h"
#include "gutil/strings/substitute.h"
+#include "pipeline/pipeline_x/dependency.h"
#include "runtime/define_primitive_type.h"
#include "runtime/large_int_value.h"
#include "runtime/primitive_type.h"
@@ -62,7 +63,6 @@
#include "vec/exprs/vliteral.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
#include "vec/runtime/shared_hash_table_controller.h"
-
namespace doris {
// PrimitiveType-> PColumnType
@@ -1235,6 +1235,11 @@ void IRuntimeFilter::signal() {
DCHECK(is_consumer());
if (_enable_pipeline_exec) {
_rf_state_atomic.store(RuntimeFilterState::READY);
+ if (!_filter_timer.empty()) {
+ for (auto& timer : _filter_timer) {
+ timer->call_ready();
+ }
+ }
} else {
std::unique_lock lock(_inner_mutex);
_rf_state = RuntimeFilterState::READY;
@@ -1255,6 +1260,10 @@ void IRuntimeFilter::signal() {
}
}
+void
IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>
timer) {
+ _filter_timer.push_back(timer);
+}
+
BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index fdd3b02ad63..f13877b869c 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -69,6 +69,10 @@ class VExprContext;
struct SharedRuntimeFilterContext;
} // namespace vectorized
+namespace pipeline {
+class RuntimeFilterTimer;
+} // namespace pipeline
+
enum class RuntimeFilterType {
UNKNOWN_FILTER = -1,
IN_FILTER = 0,
@@ -384,6 +388,17 @@ public:
}
}
+ int32_t wait_time_ms() {
+ auto runtime_filter_wait_time_ms = _state == nullptr
+ ?
_query_ctx->runtime_filter_wait_time_ms()
+ :
_state->runtime_filter_wait_time_ms();
+ return runtime_filter_wait_time_ms;
+ }
+
+ int64_t registration_time() const { return registration_time_; }
+
+ void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
+
protected:
// serialize _wrapper to protobuf
void to_protobuf(PInFilter* filter);
@@ -475,6 +490,8 @@ protected:
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
bool _opt_remote_rf;
+
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
};
// avoid expose RuntimePredicateWrapper
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 61038baa328..4c6e21c5c6d 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -145,8 +145,7 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
}
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
- _filter_dependency->set_filter_blocked_by_fn(
- [this]() { return this->runtime_filters_are_ready_or_timeout(); });
+ init_runtime_filter_dependency(_filter_dependency.get());
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 075090916a8..601bf5b8b9f 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -128,8 +128,6 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
_source_dependency->add_child(_open_dependency);
_eos_dependency =
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
_source_dependency->add_child(_eos_dependency);
- _filter_dependency->set_filter_blocked_by_fn(
- [this]() { return this->runtime_filters_are_ready_or_timeout(); });
auto& p = _parent->cast<typename Derived::Parent>();
set_scan_ranges(state, info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -143,6 +141,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
}
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
+ init_runtime_filter_dependency(_filter_dependency.get());
// 1: running at not pipeline mode will init profile.
// 2: the scan node should create scanner at pipeline mode will init
profile.
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index d56679f32a2..32bd06f5983 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -17,6 +17,10 @@
#include "dependency.h"
+#include <memory>
+#include <mutex>
+
+#include "common/logging.h"
#include "runtime/memory/mem_tracker.h"
namespace doris::pipeline {
@@ -326,4 +330,113 @@ Status
HashJoinDependency::extract_join_column(vectorized::Block& block,
return Status::OK();
}
+bool RuntimeFilterTimer::has_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ return _runtime_filter->is_ready();
+}
+
+void RuntimeFilterTimer::call_timeout() {
+ std::unique_lock<std::mutex> lc(_lock);
+ if (_call_ready) {
+ return;
+ }
+ _call_timeout = true;
+ if (_parent) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ if (_call_timeout) {
+ return;
+ }
+ _call_ready = true;
+ if (_parent) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_has_ready() {
+ std::unique_lock<std::mutex> lc(_lock);
+ DCHECK(!_call_timeout);
+ if (!_call_ready) {
+ _parent->sub_filters();
+ }
+}
+
+void RuntimeFilterTimer::call_has_release() {
+ // When the use count is equal to 1, only the timer queue still holds
ownership,
+ // so there is no need to take any action.
+}
+
+struct RuntimeFilterTimerQueue {
+ constexpr static int64_t interval = 50;
+ void start() {
+ while (true) {
+ std::unique_lock<std::mutex> lk(cv_m);
+
+ cv.wait(lk, [this] { return !_que.empty(); });
+ {
+ std::unique_lock<std::mutex> lc(_que_lock);
+ std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
+ for (auto& it : _que) {
+ if (it.use_count() == 1) {
+ it->call_has_release();
+ } else if (it->has_ready()) {
+ it->call_has_ready();
+ } else {
+ int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
+ if (ms_since_registration > it->wait_time_ms()) {
+ it->call_timeout();
+ } else {
+ new_que.push_back(std::move(it));
+ }
+ }
+ }
+ new_que.swap(_que);
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+ }
+ }
+ ~RuntimeFilterTimerQueue() { _thread.detach(); }
+ RuntimeFilterTimerQueue() { _thread =
std::thread(&RuntimeFilterTimerQueue::start, this); }
+ static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) {
+ static RuntimeFilterTimerQueue timer_que;
+
+ timer_que.push(filter);
+ }
+
+ void push(std::shared_ptr<RuntimeFilterTimer> filter) {
+ std::unique_lock<std::mutex> lc(_que_lock);
+ _que.push_back(filter);
+ cv.notify_all();
+ }
+
+ std::thread _thread;
+ std::condition_variable cv;
+ std::mutex cv_m;
+ std::mutex _que_lock;
+
+ std::list<std::shared_ptr<RuntimeFilterTimer>> _que;
+};
+
+void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
+ _filters++;
+ int64_t registration_time = runtime_filter->registration_time();
+ int32 wait_time_ms = runtime_filter->wait_time_ms();
+ auto filter_timer = std::make_shared<RuntimeFilterTimer>(
+ registration_time, wait_time_ms,
+
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()),
runtime_filter);
+ runtime_filter->set_filter_timer(filter_timer);
+ RuntimeFilterTimerQueue::push_filter_timer(filter_timer);
+}
+
+void RuntimeFilterDependency::sub_filters() {
+ _filters--;
+ if (_filters == 0) {
+ *_blocked_by_rf = false;
+ }
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 131198c4496..d5349307e5b 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -19,11 +19,16 @@
#include <sqltypes.h>
+#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
+#include <thread>
+#include <utility>
+#include "common/logging.h"
#include "concurrentqueue.h"
+#include "gutil/integral_types.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "vec/common/hash_table/hash_map_context_creator.h"
@@ -194,30 +199,64 @@ protected:
const int _node_id;
};
-class FilterDependency final : public Dependency {
+class RuntimeFilterDependency;
+class RuntimeFilterTimer {
public:
- FilterDependency(int id, int node_id, std::string name)
- : Dependency(id, name),
- _runtime_filters_are_ready_or_timeout(nullptr),
- _node_id(node_id) {}
+ RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
+ std::shared_ptr<RuntimeFilterDependency> parent,
+ IRuntimeFilter* runtime_filter)
+ : _parent(std::move(parent)),
+ _registration_time(registration_time),
+ _wait_time_ms(wait_time_ms),
+ _runtime_filter(runtime_filter) {}
- FilterDependency* filter_blocked_by() {
- if (!_runtime_filters_are_ready_or_timeout) {
+ void call_ready();
+
+ void call_timeout();
+
+ void call_has_ready();
+
+ void call_has_release();
+
+ bool has_ready();
+
+ int64_t registration_time() const { return _registration_time; }
+ int32_t wait_time_ms() const { return _wait_time_ms; }
+
+private:
+ bool _call_ready {};
+ bool _call_timeout {};
+ std::shared_ptr<RuntimeFilterDependency> _parent;
+ std::mutex _lock;
+ const int64_t _registration_time;
+ const int32_t _wait_time_ms;
+ IRuntimeFilter* _runtime_filter;
+};
+class RuntimeFilterDependency final : public Dependency {
+public:
+ RuntimeFilterDependency(int id, int node_id, std::string name)
+ : Dependency(id, name), _node_id(node_id) {}
+
+ RuntimeFilterDependency* filter_blocked_by() {
+ if (!_blocked_by_rf) {
return nullptr;
}
- if (!_runtime_filters_are_ready_or_timeout()) {
+ if (*_blocked_by_rf) {
return this;
}
return nullptr;
}
void* shared_state() override { return nullptr; }
- void set_filter_blocked_by_fn(std::function<bool()> call_fn) {
- _runtime_filters_are_ready_or_timeout = call_fn;
+ void add_filters(IRuntimeFilter* runtime_filter);
+ void sub_filters();
+ void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
+ _blocked_by_rf = blocked_by_rf;
}
protected:
- std::function<bool()> _runtime_filters_are_ready_or_timeout;
const int _node_id;
+ std::atomic_int _filters;
+ std::shared_ptr<std::atomic_bool> _blocked_by_rf;
};
class AndDependency final : public WriteDependency {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 88b21b754fe..c1dab07390d 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -315,7 +315,7 @@
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
_state(state),
_finish_dependency(new FinishDependency(parent->operator_id(),
parent->node_id(),
parent->get_name() +
"_FINISH_DEPENDENCY")) {
- _filter_dependency = std::make_unique<FilterDependency>(
+ _filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY");
}
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index d2f170f9f28..5691d989e47 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -100,7 +100,7 @@ public:
virtual Dependency* dependency() { return nullptr; }
FinishDependency* finishdependency() { return _finish_dependency.get(); }
- FilterDependency* filterdependency() { return _filter_dependency.get(); }
+ RuntimeFilterDependency* filterdependency() { return
_filter_dependency.get(); }
protected:
friend class OperatorXBase;
@@ -134,7 +134,7 @@ protected:
bool _closed = false;
vectorized::Block _origin_block;
std::shared_ptr<FinishDependency> _finish_dependency;
- std::unique_ptr<FilterDependency> _filter_dependency;
+ std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};
class OperatorXBase : public OperatorBase {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 3a33431192e..90fdda921f0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -172,7 +172,7 @@ private:
std::vector<Dependency*> _read_dependencies;
WriteDependency* _write_dependencies;
std::vector<FinishDependency*> _finish_dependencies;
- FilterDependency* _filter_dependency;
+ RuntimeFilterDependency* _filter_dependency;
DependencyMap _upstream_dependency;
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index c3bc8c0e22c..9eda2788f06 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -26,7 +26,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t
filter_id,
: _filter_id(filter_id),
_runtime_filter_descs(runtime_filters),
_row_descriptor_ref(row_descriptor),
- _conjuncts_ref(conjuncts) {}
+ _conjuncts_ref(conjuncts) {
+ _blocked_by_rf = std::make_shared<std::atomic_bool>(false);
+}
Status RuntimeFilterConsumer::init(RuntimeState* state) {
_state = state;
@@ -72,7 +74,7 @@ Status RuntimeFilterConsumer::_register_runtime_filter() {
}
bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
- if (!_blocked_by_rf) {
+ if (!*_blocked_by_rf) {
return true;
}
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
@@ -81,10 +83,19 @@ bool
RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
return false;
}
}
- _blocked_by_rf = false;
+ *_blocked_by_rf = false;
return true;
}
+void RuntimeFilterConsumer::init_runtime_filter_dependency(
+ doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
+ _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
+ _runtime_filter_dependency->add_filters(runtime_filter);
+ }
+}
+
Status RuntimeFilterConsumer::_acquire_runtime_filter() {
SCOPED_TIMER(_acquire_runtime_filter_timer);
VExprSPtrs vexprs;
@@ -99,14 +110,14 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() {
_runtime_filter_ctxs[i].apply_mark = true;
} else if (runtime_filter->current_state() ==
RuntimeFilterState::NOT_READY &&
!_runtime_filter_ctxs[i].apply_mark) {
- _blocked_by_rf = true;
+ *_blocked_by_rf = true;
} else if (!_runtime_filter_ctxs[i].apply_mark) {
DCHECK(runtime_filter->current_state() !=
RuntimeFilterState::NOT_READY);
_is_all_rf_applied = false;
}
}
RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
- if (_blocked_by_rf) {
+ if (*_blocked_by_rf) {
return Status::WaitForRf("Runtime filters are neither not ready nor
timeout");
}
diff --git a/be/src/vec/exec/runtime_filter_consumer.h
b/be/src/vec/exec/runtime_filter_consumer.h
index ed7a097901e..fcb6ed3c839 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -19,6 +19,7 @@
#include "exec/exec_node.h"
#include "exprs/runtime_filter.h"
+#include "pipeline/pipeline_x/dependency.h"
namespace doris::vectorized {
@@ -37,6 +38,8 @@ public:
bool runtime_filters_are_ready_or_timeout();
+ void
init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+
protected:
// Register and get all runtime filters at Init phase.
Status _register_runtime_filter();
@@ -77,7 +80,7 @@ private:
// True means all runtime filters are applied to scanners
bool _is_all_rf_applied = true;
- bool _blocked_by_rf = false;
+ std::shared_ptr<std::atomic_bool> _blocked_by_rf;
RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]