This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2098dd39 Support user fields of baidu protocol (#2406)
2098dd39 is described below

commit 2098dd3927ac389cdf6a0973b0edf19a477ff540
Author: Bright Chen <[email protected]>
AuthorDate: Wed Dec 20 11:49:31 2023 +0800

    Support user fields of baidu protocol (#2406)
    
    * Support user fields of baidu protocol
    
    * Use request_user_fields before RPC
---
 src/brpc/controller.cpp                |  4 ++++
 src/brpc/controller.h                  | 26 ++++++++++++++++++++++++
 src/brpc/policy/baidu_rpc_meta.proto   |  3 ++-
 src/brpc/policy/baidu_rpc_protocol.cpp | 29 +++++++++++++++++++++++++++
 test/brpc_server_unittest.cpp          | 36 ++++++++++++++++++++++++++++++++++
 5 files changed, 97 insertions(+), 1 deletion(-)

diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 47dafb6f..f49a27a9 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -209,6 +209,8 @@ void Controller::ResetNonPods() {
     _request_buf.clear();
     delete _http_request;
     delete _http_response;
+    delete _request_user_fields;
+    delete _response_user_fields;
     _request_attachment.clear();
     _response_attachment.clear();
     if (_wpa) {
@@ -283,6 +285,8 @@ void Controller::ResetPods() {
     _idl_result = IDL_VOID_RESULT;
     _http_request = NULL;
     _http_response = NULL;
+    _request_user_fields = NULL;
+    _response_user_fields = NULL;
     _request_stream = INVALID_STREAM_ID;
     _response_stream = INVALID_STREAM_ID;
     _remote_stream_settings = NULL;
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 708ff8c6..d3ffb99f 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -105,6 +105,8 @@ enum StopStyle {
 
 const int32_t UNSET_MAGIC_NUM = -123456789;
 
+typedef butil::FlatMap<std::string, std::string> UserFieldsMap;
+
 // A Controller mediates a single method call. The primary purpose of
 // the controller is to provide a way to manipulate settings per RPC-call 
 // and to find out about RPC-level errors.
@@ -255,6 +257,26 @@ public:
         return tmp;
     }
 
+    UserFieldsMap* request_user_fields() {
+        if (!_request_user_fields) {
+            _request_user_fields = new UserFieldsMap;
+            _request_user_fields->init(29);
+        }
+        return _request_user_fields;
+    }
+
+    bool has_request_user_fields() const { return _request_user_fields; }
+
+    UserFieldsMap* response_user_fields() {
+        if (!_response_user_fields) {
+            _response_user_fields = new UserFieldsMap;
+            _response_user_fields->init(29);
+        }
+        return _response_user_fields;
+    }
+
+    bool has_response_user_fields() const { return _response_user_fields; }
+
     // User attached data or body of http request, which is wired to network
     // directly instead of being serialized into protobuf messages.
     butil::IOBuf& request_attachment() { return _request_attachment; }
@@ -820,6 +842,10 @@ private:
     HttpHeader* _http_request;
     HttpHeader* _http_response;
 
+    // User fields of baidu_std protocol.
+    UserFieldsMap* _request_user_fields;
+    UserFieldsMap* _response_user_fields;
+
     std::unique_ptr<KVMap> _session_kv;
 
     // Fields with large size but low access frequency 
diff --git a/src/brpc/policy/baidu_rpc_meta.proto 
b/src/brpc/policy/baidu_rpc_meta.proto
index dc716540..300564bb 100644
--- a/src/brpc/policy/baidu_rpc_meta.proto
+++ b/src/brpc/policy/baidu_rpc_meta.proto
@@ -31,7 +31,8 @@ message RpcMeta {
     optional int32 attachment_size = 5;
     optional ChunkInfo chunk_info = 6;
     optional bytes authentication_data = 7;
-    optional StreamSettings stream_settings = 8;   
+    optional StreamSettings stream_settings = 8;
+    map<string, string> user_fields = 9;
 }
 
 message RpcRequestMeta {
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp 
b/src/brpc/policy/baidu_rpc_protocol.cpp
index d8342619..b19fbb37 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -223,6 +223,15 @@ void SendRpcResponse(int64_t correlation_id,
         }
     }
 
+    if (cntl->has_response_user_fields() &&
+        !cntl->response_user_fields()->empty()) {
+        ::google::protobuf::Map<std::string, std::string>& user_fields
+            = *meta.mutable_user_fields();
+        user_fields.insert(cntl->response_user_fields()->begin(),
+                           cntl->response_user_fields()->end());
+
+    }
+
     butil::IOBuf res_buf;
     SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
     if (append_body) {
@@ -380,6 +389,12 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
         accessor.set_remote_stream_settings(meta.release_stream_settings());
     }
 
+    if (!meta.user_fields().empty()) {
+        for (const auto& it : meta.user_fields()) {
+            (*cntl->request_user_fields())[it.first] = it.second;
+        }
+    }
+
     // Tag the bthread with this server's key for thread_local_data().
     if (server->thread_local_options().thread_local_data_factory) {
         bthread_assign_data((void*)&server->thread_local_options());
@@ -595,6 +610,13 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
         accessor.set_remote_stream_settings(
                 new StreamSettings(meta.stream_settings()));
     }
+
+    if (!meta.user_fields().empty()) {
+        for (const auto& it : meta.user_fields()) {
+            (*cntl->response_user_fields())[it.first] = it.second;
+        }
+    }
+
     Span* span = accessor.span();
     if (span) {
         span->set_base_real_us(msg->base_real_us());
@@ -694,6 +716,13 @@ void PackRpcRequest(butil::IOBuf* req_buf,
         s->FillSettings(meta.mutable_stream_settings());
     }
 
+    if (cntl->has_request_user_fields() && 
!cntl->request_user_fields()->empty()) {
+        ::google::protobuf::Map<std::string, std::string>& user_fields
+            = *meta.mutable_user_fields();
+        user_fields.insert(cntl->request_user_fields()->begin(),
+                           cntl->request_user_fields()->end());
+    }
+
     // Don't use res->ByteSize() since it may be compressed
     const size_t req_size = request_body.length(); 
     const size_t attached_size = cntl->request_attachment().length();
diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp
index c22b6b53..8a8a76d8 100644
--- a/test/brpc_server_unittest.cpp
+++ b/test/brpc_server_unittest.cpp
@@ -97,6 +97,8 @@ bool g_delete = false;
 const std::string EXP_REQUEST = "hello";
 const std::string EXP_RESPONSE = "world";
 const std::string EXP_REQUEST_BASE64 = "aGVsbG8=";
+const std::string EXP_USER_FIELD_KEY = "hello";
+const std::string EXP_USER_FIELD_VALUE = "world";
 
 class EchoServiceImpl : public test::EchoService {
 public:
@@ -118,6 +120,13 @@ public:
         } else {
             LOG(INFO) << "No sleep, protocol=" << cntl->request_protocol();
         }
+        if (cntl->has_request_user_fields()) {
+            ASSERT_TRUE(!cntl->request_user_fields()->empty());
+            std::string* val = 
cntl->request_user_fields()->seek(EXP_USER_FIELD_KEY);
+            ASSERT_TRUE(val != NULL);
+            ASSERT_EQ(*val, EXP_USER_FIELD_VALUE);
+            cntl->response_user_fields()->insert(EXP_USER_FIELD_KEY, 
EXP_USER_FIELD_VALUE);
+        }
     }
 
     virtual void ComboEcho(google::protobuf::RpcController*,
@@ -1620,4 +1629,31 @@ TEST_F(ServerTest, max_concurrency) {
     stub.Echo(&cntl4, &req, NULL, NULL);
     ASSERT_FALSE(cntl4.Failed()) << cntl4.ErrorText();
 }
+
+TEST_F(ServerTest, user_fields) {
+    const int port = 9200;
+    brpc::Server server;
+    EchoServiceImpl service;
+    ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+    ASSERT_EQ(0, server.Start(port, NULL));
+
+    brpc::Channel channel;
+    ASSERT_EQ(0, channel.Init("0.0.0.0", port, NULL));
+    test::EchoService_Stub stub(&channel);
+
+    brpc::Controller cntl;
+    cntl.request_user_fields()->insert(EXP_USER_FIELD_KEY, 
EXP_USER_FIELD_VALUE);
+    test::EchoRequest req;
+    test::EchoResponse res;
+    req.set_message("hello");
+    stub.Echo(&cntl, &req, &res, NULL);
+
+    ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+    ASSERT_TRUE(cntl.has_response_user_fields());
+    ASSERT_TRUE(!cntl.response_user_fields()->empty());
+    std::string* val = cntl.response_user_fields()->seek(EXP_USER_FIELD_KEY);
+    ASSERT_TRUE(val != NULL);
+    ASSERT_EQ(*val, EXP_USER_FIELD_VALUE);
+}
+
 } //namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to