This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ffc57c9ef44 [Bug](runtime-filter) fix brpc ctrl use after free (#37223)
ffc57c9ef44 is described below
commit ffc57c9ef44a5f7f199f4db1dfa15bb35349bb08
Author: Pxl <[email protected]>
AuthorDate: Wed Jul 3 21:01:50 2024 +0800
[Bug](runtime-filter) fix brpc ctrl use after free (#37223)
part of #35186
---
be/src/runtime/runtime_filter_mgr.cpp | 25 ++++++++++++-------------
1 file changed, 12 insertions(+), 13 deletions(-)
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 010cb5a60e5..dfc55edc7c3 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -346,23 +346,22 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
for (auto addr : cnt_val->source_addrs) {
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
- AsyncRPCContext<PSyncFilterSizeRequest, PSyncFilterSizeResponse>
ctx;
- auto* pquery_id = ctx.request.mutable_query_id();
+
+ auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
+
DummyBrpcCallback<PSyncFilterSizeResponse>>::
+ create_unique(std::make_shared<PSyncFilterSizeRequest>(),
+
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
+
+ auto* pquery_id = closure->request_->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
- ctx.request.set_filter_id(filter_id);
- ctx.request.set_filter_size(cnt_val->global_size);
+ closure->request_->set_filter_id(filter_id);
+ closure->request_->set_filter_size(cnt_val->global_size);
- stub->sync_filter_size(&ctx.cntl, &ctx.request, &ctx.response,
brpc::DoNothing());
- brpc::Join(ctx.cntl.call_id());
- if (auto status = Status::create(ctx.response.status()); !status) {
- return status;
- }
- if (ctx.cntl.Failed()) {
-
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(ctx.cntl.remote_side());
- return Status::InternalError(ctx.cntl.ErrorText());
- }
+ stub->sync_filter_size(closure->cntl_.get(),
closure->request_.get(),
+ closure->response_.get(),
brpc::DoNothing());
+ closure.release();
}
}
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]