github-actions[bot] commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555179463
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,60 @@ std::string Dependency::debug_string(int
indentation_level) {
std::string RuntimeFilterDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer,
- "{}{}: id={}, block task = {}, ready={}, _filters = {},
_blocked_by_rf = {}",
- std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
- _ready, _filters.load(), _blocked_by_rf ?
_blocked_by_rf->load() : false);
+ fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+ Dependency::debug_string(indentation_level),
_runtime_filter->formatted_state());
return fmt::to_string(debug_string_buffer);
}
-bool RuntimeFilterTimer::has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto ready = _ready.load() || _is_cancelled();
+ if (!ready && task) {
+ _add_block_task(task);
+ task->_blocked_dep = this;
+ }
+ return ready ? nullptr : this;
}
void RuntimeFilterTimer::call_timeout() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_ready) {
- return;
- }
- _call_timeout = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
void RuntimeFilterTimer::call_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_timeout) {
- return;
- }
- _call_ready = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
- _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- DCHECK(!_call_timeout);
- if (!_call_ready) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
- const auto filter_id = runtime_filter->filter_id();
- ;
- _filters++;
- _filter_ready_map[filter_id] = false;
- int64_t registration_time = runtime_filter->registration_time();
- int32 wait_time_ms = runtime_filter->wait_time_ms();
- auto filter_timer = std::make_shared<RuntimeFilterTimer>(
- filter_id, registration_time, wait_time_ms,
-
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
- runtime_filter->set_filter_timer(filter_timer);
-
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
Review Comment:
warning: method 'start' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/dependency.h:240:
```diff
- void start();
+ static void start();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]