yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555106446


##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool 
RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
 }
 
 void RuntimeFilterConsumer::init_runtime_filter_dependency(
-        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
-    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                runtime_filter_dependencies,
+        const int id, const int node_id, const std::string& name) {
+    runtime_filter_dependencies.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
-        _runtime_filter_dependency->add_filters(runtime_filter);
+        runtime_filter_dependencies[i] = 
std::make_shared<pipeline::RuntimeFilterDependency>(
+                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+        _runtime_filter_ctxs[i].runtime_filter_dependency = 
runtime_filter_dependencies[i].get();
+        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                runtime_filter->registration_time(), 
runtime_filter->wait_time_ms(),
+                runtime_filter_dependencies[i]);
+        runtime_filter->set_filter_timer(filter_timer);
+        
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
     }
 }
 
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+    Status rf_status = Status::OK();
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, 
vexprs, false));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if (runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            *_blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != 
RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
+        if (pipeline_x) {
+            DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+                    << _state->pipeline_x_task()->debug_string();
+            auto* rf_dep = 
_runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+                    _state->pipeline_x_task());
+
+            bool timeout = 
_runtime_filter_ctxs[i].runtime_filter_dependency->timeout();

Review Comment:
   我们的dependency 应该只有ready 和 block 两个状态,尽量不要引入timeout这个状态了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to