BiteTheDDDDt commented on code in PR #64851:
URL: https://github.com/apache/doris/pull/64851#discussion_r3479622828


##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -48,6 +51,217 @@
 
 namespace doris {
 
+namespace {
+
+TNetworkAddress to_thrift_network_address(const PNetworkAddress& address) {
+    TNetworkAddress result;
+    result.hostname = address.hostname();
+    result.port = address.port();
+    return result;
+}
+
+PNetworkAddress to_proto_network_address(const TNetworkAddress& address) {
+    PNetworkAddress result;
+    result.set_hostname(address.hostname);
+    result.set_port(address.port);
+    return result;
+}
+
+bool can_use_tree_publish_targets(const 
std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+    return std::all_of(targets.begin(), targets.end(), [](const auto& target) {
+        return target.__isset.target_fragment_ids && 
!target.target_fragment_ids.empty();
+    });
+}
+
+std::vector<RuntimeFilterPublishTarget> build_runtime_filter_publish_targets(
+        const std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+    std::vector<RuntimeFilterPublishTarget> publish_targets;
+    publish_targets.reserve(targets.size());
+    for (const auto& target : targets) {
+        DORIS_CHECK(target.__isset.target_fragment_ids);
+        DORIS_CHECK(!target.target_fragment_ids.empty());
+        RuntimeFilterPublishTarget publish_target;
+        publish_target.addr = 
to_proto_network_address(target.target_fragment_instance_addr);
+        publish_target.fragment_ids = target.target_fragment_ids;
+        publish_targets.emplace_back(std::move(publish_target));
+    }
+    return publish_targets;
+}
+
+class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure {
+public:
+    RuntimeFilterRelayRpcClosure(std::shared_ptr<PPublishFilterRequestV2> 
request,
+                                 std::weak_ptr<QueryContext> query_ctx)
+            : _request(std::move(request)),
+              
_callback(HandleErrorBrpcCallback<PPublishFilterResponse>::create_shared(
+                      std::move(query_ctx))) {}
+
+    void Run() override {
+        std::unique_ptr<RuntimeFilterRelayRpcClosure> self(this);
+        _callback->call();
+    }
+
+    brpc::Controller* cntl() { return _callback->cntl_.get(); }
+    PPublishFilterRequestV2* request() { return _request.get(); }
+    PPublishFilterResponse* response() { return _callback->response_.get(); }
+
+private:
+    std::shared_ptr<PPublishFilterRequestV2> _request;
+    std::shared_ptr<HandleErrorBrpcCallback<PPublishFilterResponse>> _callback;
+};
+
+Status send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task,
+                                     const butil::IOBuf& request_attachment, 
int timeout_ms,
+                                     std::weak_ptr<QueryContext> query_ctx) {
+    TNetworkAddress address = to_thrift_network_address(task.receiver.addr);
+    std::shared_ptr<PBackendService_Stub> stub(
+            
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(address));
+    if (stub == nullptr) {
+        LOG(WARNING) << "Failed to init runtime filter relay rpc to " << 
address.hostname << ":"
+                     << address.port;
+        return Status::InternalError("Failed to init runtime filter relay rpc 
to {}:{}",
+                                     address.hostname, address.port);
+    }
+
+    auto* closure = new RuntimeFilterRelayRpcClosure(

Review Comment:
   你这里new出来什么时候delete呢?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to