This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new db2721915e1 [Bug](runtime-filter) release dependency when rf rpc
failed or meet error status (#36297)
db2721915e1 is described below
commit db2721915e1eb8b94a65fec11e6728b4b7ab08aa
Author: Pxl <[email protected]>
AuthorDate: Fri Jun 14 23:44:08 2024 +0800
[Bug](runtime-filter) release dependency when rf rpc failed or meet error
status (#36297)
pick from #36126
---
be/src/exprs/runtime_filter.cpp | 30 ++++++++++++++++++++++++++----
be/src/util/ref_count_closure.h | 32 ++++++++++++++++++++------------
2 files changed, 46 insertions(+), 16 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 3e07943c45e..39eb814bbea 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -31,6 +31,7 @@
#include <memory>
#include <mutex>
#include <ostream>
+#include <utility>
#include "agent/be_exec_version_manager.h"
#include "common/logging.h"
@@ -1029,6 +1030,30 @@ Status IRuntimeFilter::publish(bool publish_local) {
return Status::OK();
}
+class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
+
DummyBrpcCallback<PSendFilterSizeResponse>> {
+ std::shared_ptr<pipeline::Dependency> _dependency;
+ using Base =
+ AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>>;
+ ENABLE_FACTORY_CREATOR(SyncSizeClosure);
+
+ void _process_if_rpc_failed() override {
+ ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
+ Base::_process_if_rpc_failed();
+ }
+
+ void _process_if_meet_error_status(const Status& status) override {
+ ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
+ Base::_process_if_meet_error_status(status);
+ }
+
+public:
+ SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
+
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
+ std::shared_ptr<pipeline::Dependency> dependency)
+ : Base(req, callback), _dependency(std::move(dependency)) {}
+};
+
Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
DCHECK(is_producer());
@@ -1069,10 +1094,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t
local_filter_size) {
auto request = std::make_shared<PSendFilterSizeRequest>();
auto callback =
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
- auto closure =
- AutoReleaseClosure<PSendFilterSizeRequest,
-
DummyBrpcCallback<PSendFilterSizeResponse>>::create_unique(request,
-
callback);
+ auto closure = SyncSizeClosure::create_unique(request, callback,
_dependency);
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index 7bbcfb7da39..01e523d9b9a 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -71,16 +71,6 @@ public:
template <typename T>
concept HasStatus = requires(T* response) { response->status(); };
-template <typename Response>
-void process_status(Response* response) {}
-
-template <HasStatus Response>
-void process_status(Response* response) {
- if (auto status = Status::create(response->status()); !status) {
- LOG(WARNING) << "RPC meet error status: " << status;
- }
-}
-
template <typename Request, typename Callback>
class AutoReleaseClosure : public google::protobuf::Closure {
using Weak = typename std::shared_ptr<Callback>::weak_type;
@@ -105,9 +95,9 @@ public:
tmp->call();
}
if (cntl_->Failed()) {
- LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText();
+ _process_if_rpc_failed();
} else {
- process_status<ResponseType>(response_.get());
+ _process_status<ResponseType>(response_.get());
}
}
@@ -120,7 +110,25 @@ public:
std::shared_ptr<Request> request_;
std::shared_ptr<ResponseType> response_;
+protected:
+ virtual void _process_if_rpc_failed() {
+ LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText();
+ }
+
+ virtual void _process_if_meet_error_status(const Status& status) {
+ LOG(WARNING) << "RPC meet error status: " << status;
+ }
+
private:
+ template <typename Response>
+ void _process_status(Response* response) {}
+
+ template <HasStatus Response>
+ void _process_status(Response* response) {
+ if (auto status = Status::create(response->status()); !status) {
+ _process_if_meet_error_status(status);
+ }
+ }
// Use a weak ptr to keep the callback, so that the callback can be
deleted if the main
// thread is freed.
Weak callback_;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]