yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555105895


##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -209,75 +209,36 @@ struct FinishDependency final : public Dependency {
 };
 
 class RuntimeFilterDependency;
+struct RuntimeFilterTimerQueue;
 class RuntimeFilterTimer {
 public:
-    RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t 
wait_time_ms,
+    RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
                        std::shared_ptr<RuntimeFilterDependency> parent)
-            : _filter_id(filter_id),
-              _parent(std::move(parent)),
+            : _parent(std::move(parent)),
               _registration_time(registration_time),
               _wait_time_ms(wait_time_ms) {}
 
+    // Called by runtime filter producer.
     void call_ready();
 
+    // Called by RuntimeFilterTimerQueue which is responsible for checking if 
this rf is timeout.
     void call_timeout();
 
-    void call_has_ready();
-
-    // When the use count is equal to 1, only the timer queue still holds 
ownership,
-    // so there is no need to take any action.
-    void call_has_release() {};
-
-    bool has_ready();
-
     int64_t registration_time() const { return _registration_time; }
     int32_t wait_time_ms() const { return _wait_time_ms; }
 
 private:
-    int _filter_id = -1;
-    bool _call_ready {};
-    bool _call_timeout {};
-    std::shared_ptr<RuntimeFilterDependency> _parent;
+    friend struct RuntimeFilterTimerQueue;
+    std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
     std::mutex _lock;
     const int64_t _registration_time;
     const int32_t _wait_time_ms;
-    bool _is_ready = false;
 };
 
 struct RuntimeFilterTimerQueue {
     constexpr static int64_t interval = 10;
     void run() { _thread.detach(); }
-    void start() {
-        while (!_stop) {
-            std::unique_lock<std::mutex> lk(cv_m);
-
-            cv.wait(lk, [this] { return !_que.empty() || _stop; });
-            if (_stop) {
-                break;
-            }
-            {
-                std::unique_lock<std::mutex> lc(_que_lock);
-                std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> 
new_que;
-                for (auto& it : _que) {
-                    if (it.use_count() == 1) {
-                        it->call_has_release();
-                    } else if (it->has_ready()) {
-                        it->call_has_ready();
-                    } else {
-                        int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
-                        if (ms_since_registration > it->wait_time_ms()) {
-                            it->call_timeout();
-                        } else {
-                            new_que.push_back(std::move(it));
-                        }
-                    }
-                }
-                new_que.swap(_que);
-            }
-            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
-        }
-        _shutdown = true;
-    }

Review Comment:
   把下面的wait for shutdown的逻辑,移动到stop 里面,stop 应该是同步的,否则容易挂。不要在析构函数里等待。那个时候可能就晚了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to