This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 663625a7 support checksum (#2967)
663625a7 is described below

commit 663625a73b92b37dcaef1e55e73e99d4b136388a
Author: Yang,Liming <[email protected]>
AuthorDate: Fri Jun 20 14:16:35 2025 +0800

    support checksum (#2967)
---
 example/echo_c++/client.cpp                    |   6 ++
 example/echo_c++/server.cpp                    |   6 ++
 src/brpc/checksum.cpp                          | 100 +++++++++++++++++++++++++
 src/brpc/checksum.h                            |  63 ++++++++++++++++
 src/brpc/controller.cpp                        |   4 +
 src/brpc/controller.h                          |  12 +++
 src/brpc/details/controller_private_accessor.h |  10 +++
 src/brpc/global.cpp                            |  11 +++
 src/brpc/options.proto                         |   5 ++
 src/brpc/policy/baidu_rpc_meta.proto           |   2 +
 src/brpc/policy/baidu_rpc_protocol.cpp         |  85 ++++++++++++++-------
 src/brpc/policy/crc32c_checksum.cpp            |  64 ++++++++++++++++
 src/brpc/policy/crc32c_checksum.h              |  37 +++++++++
 src/brpc/server.cpp                            |  15 ++++
 test/brpc_server_unittest.cpp                  |  95 ++++++++++++++---------
 15 files changed, 455 insertions(+), 60 deletions(-)

diff --git a/example/echo_c++/client.cpp b/example/echo_c++/client.cpp
index 3cc83f72..4a3aee9c 100644
--- a/example/echo_c++/client.cpp
+++ b/example/echo_c++/client.cpp
@@ -31,6 +31,7 @@ DEFINE_string(load_balancer, "", "The algorithm for load 
balancing");
 DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
 DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
 DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
+DEFINE_bool(enable_checksum, false, "Enable checksum or not");
 
 int main(int argc, char* argv[]) {
     // Parse gflags. We recommend you to use gflags as well.
@@ -71,6 +72,11 @@ int main(int argc, char* argv[]) {
         // being serialized into protobuf messages.
         cntl.request_attachment().append(FLAGS_attachment);
 
+        // Use checksum, only support CRC32C now.
+        if (FLAGS_enable_checksum) {
+            cntl.set_request_checksum_type(brpc::CHECKSUM_TYPE_CRC32C);
+        }
+
         // Because `done'(last parameter) is NULL, this function waits until
         // the response comes back or error occurs(including timedout).
         stub.Echo(&cntl, &request, &response, NULL);
diff --git a/example/echo_c++/server.cpp b/example/echo_c++/server.cpp
index cc0050ae..41131146 100644
--- a/example/echo_c++/server.cpp
+++ b/example/echo_c++/server.cpp
@@ -29,6 +29,7 @@ DEFINE_string(listen_addr, "", "Server listen address, may be 
IPV4/IPV6/UDS."
             " If this is set, the flag port will be ignored");
 DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
              "read/write operations during the last `idle_timeout_s'");
+DEFINE_bool(enable_checksum, false, "Enable checksum or not");
 
 // Your implementation of example::EchoService
 // Notice that implementing brpc::Describable grants the ability to put
@@ -75,6 +76,11 @@ public:
             // being serialized into protobuf messages.
             cntl->response_attachment().append(cntl->request_attachment());
         }
+
+        // Use checksum, only support CRC32C now.
+        if (FLAGS_enable_checksum) {
+            cntl->set_response_checksum_type(brpc::CHECKSUM_TYPE_CRC32C);
+        }
     }
 
     // optional
diff --git a/src/brpc/checksum.cpp b/src/brpc/checksum.cpp
new file mode 100644
index 00000000..85d04d85
--- /dev/null
+++ b/src/brpc/checksum.cpp
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/checksum.h"
+
+#include "brpc/protocol.h"
+#include "butil/logging.h"
+
+namespace brpc {
+
+static const int MAX_HANDLER_SIZE = 1024;
+static ChecksumHandler s_handler_map[MAX_HANDLER_SIZE] = {{NULL, NULL, NULL}};
+
+int RegisterChecksumHandler(ChecksumType type, ChecksumHandler handler) {
+    if (NULL == handler.Compute) {
+        LOG(FATAL) << "Invalid parameter: handler function is NULL";
+        return -1;
+    }
+    int index = type;
+    if (index < 0 || index >= MAX_HANDLER_SIZE) {
+        LOG(FATAL) << "ChecksumType=" << type << " is out of range";
+        return -1;
+    }
+    if (s_handler_map[index].Compute != NULL) {
+        LOG(FATAL) << "ChecksumType=" << type << " was registered";
+        return -1;
+    }
+    s_handler_map[index] = handler;
+    return 0;
+}
+
+// Find ChecksumHandler by type.
+// Returns NULL if not found
+inline const ChecksumHandler* FindChecksumHandler(ChecksumType type) {
+    int index = type;
+    if (index < 0 || index >= MAX_HANDLER_SIZE) {
+        LOG(ERROR) << "ChecksumType=" << type << " is out of range";
+        return NULL;
+    }
+    if (NULL == s_handler_map[index].Compute) {
+        return NULL;
+    }
+    return &s_handler_map[index];
+}
+
+const char* ChecksumTypeToCStr(ChecksumType type) {
+    if (type == CHECKSUM_TYPE_NONE) {
+        return "none";
+    }
+    const ChecksumHandler* handler = FindChecksumHandler(type);
+    return (handler != NULL ? handler->name : "unknown");
+}
+
+void ListChecksumHandler(std::vector<ChecksumHandler>* vec) {
+    vec->clear();
+    for (int i = 0; i < MAX_HANDLER_SIZE; ++i) {
+        if (s_handler_map[i].Compute != NULL) {
+            vec->push_back(s_handler_map[i]);
+        }
+    }
+}
+
+// Compute `data' checksum
+void ComputeDataChecksum(const ChecksumIn& in, ChecksumType checksum_type) {
+    if (checksum_type == CHECKSUM_TYPE_NONE) {
+        return;
+    }
+    const ChecksumHandler* handler = FindChecksumHandler(checksum_type);
+    if (NULL != handler) {
+        handler->Compute(in);
+    }
+}
+
+// Verify `data' checksum Returns true on success, false otherwise
+bool VerifyDataChecksum(const ChecksumIn& in, ChecksumType checksum_type) {
+    if (checksum_type == CHECKSUM_TYPE_NONE) {
+        return true;
+    }
+    const ChecksumHandler* handler = FindChecksumHandler(checksum_type);
+    if (NULL != handler) {
+        return handler->Verify(in);
+    }
+    return true;
+}
+
+}  // namespace brpc
diff --git a/src/brpc/checksum.h b/src/brpc/checksum.h
new file mode 100644
index 00000000..ff0a98a8
--- /dev/null
+++ b/src/brpc/checksum.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_CHECKSUM_H
+#define BRPC_CHECKSUM_H
+
+#include "brpc/controller.h"
+#include "brpc/options.pb.h"  // ChecksumType
+#include "butil/iobuf.h"      // butil::IOBuf
+
+namespace brpc {
+
+struct ChecksumIn {
+    const butil::IOBuf* buf;
+    Controller* cntl;
+};
+
+struct ChecksumHandler {
+    // checksum `buf'.
+    // Returns checksum value
+    void (*Compute)(const ChecksumIn& in);
+
+    // verify buf checksum
+    // Rerturn true on success, false otherwise
+    bool (*Verify)(const ChecksumIn& in);
+
+    // Name of the checksum algorithm, must be string constant.
+    const char* name;
+};
+
+// [NOT thread-safe] Register `handler' using key=`type'
+// Returns 0 on success, -1 otherwise
+int RegisterChecksumHandler(ChecksumType type, ChecksumHandler handler);
+
+// Returns the `name' of the checksumType if registered
+const char* ChecksumTypeToCStr(ChecksumType type);
+
+// Put all registered handlers into `vec'.
+void ListChecksumHandler(std::vector<ChecksumHandler>* vec);
+
+// Compute `data' checksum and set to controller
+void ComputeDataChecksum(const ChecksumIn& in, ChecksumType checksum_type);
+
+// Verify `data' checksum Returns true on success, false otherwise
+bool VerifyDataChecksum(const ChecksumIn& in, ChecksumType checksum_type);
+
+}  // namespace brpc
+
+#endif  // BRPC_CHECKSUM_H
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 1362d322..b16ab545 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -269,6 +269,8 @@ void Controller::ResetPods() {
     _preferred_index = -1;
     _request_compress_type = COMPRESS_TYPE_NONE;
     _response_compress_type = COMPRESS_TYPE_NONE;
+    _request_checksum_type = CHECKSUM_TYPE_NONE;
+    _response_checksum_type = CHECKSUM_TYPE_NONE;
     _fail_limit = UNSET_MAGIC_NUM;
     _pipelined_count = 0;
     _inheritable.Reset();
@@ -1346,6 +1348,7 @@ void Controller::SaveClientSettings(ClientSettings* s) 
const {
     s->tos = _tos;
     s->connection_type = _connection_type;
     s->request_compress_type = _request_compress_type;
+    s->request_checksum_type = _request_checksum_type;
     s->log_id = log_id();
     s->has_request_code = has_request_code();
     s->request_code = _request_code;
@@ -1359,6 +1362,7 @@ void Controller::ApplyClientSettings(const 
ClientSettings& s) {
     set_type_of_service(s.tos);
     set_connection_type(s.connection_type);
     set_request_compress_type(s.request_compress_type);
+    set_request_checksum_type(s.request_checksum_type);
     set_log_id(s.log_id);
     set_flag(FLAGS_REQUEST_CODE, s.has_request_code);
     _request_code = s.request_code;
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 000aee2f..64474419 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -241,6 +241,9 @@ public:
     // Set compression method for request.
     void set_request_compress_type(CompressType t) { _request_compress_type = 
t; }
 
+    // Set checksum type for request.
+    void set_request_checksum_type(ChecksumType t) { _request_checksum_type = 
t; }
+
     // Required by some load balancers.
     void set_request_code(uint64_t request_code) {
         add_flag(FLAGS_REQUEST_CODE);
@@ -464,6 +467,9 @@ public:
 
     // Set compression method for response.
     void set_response_compress_type(CompressType t) { _response_compress_type 
= t; }
+
+    // Set checksum type for response.
+    void set_response_checksum_type(ChecksumType t) { _response_checksum_type 
= t; }
     
     // Non-zero when this RPC call is traced (by rpcz or rig).
     // NOTE: Only valid at server-side, always zero at client-side.
@@ -552,6 +558,8 @@ public:
     const std::string& request_id() const { return _inheritable.request_id; }
     CompressType request_compress_type() const { return 
_request_compress_type; }
     CompressType response_compress_type() const { return 
_response_compress_type; }
+    ChecksumType request_checksum_type() const { return 
_request_checksum_type; }
+    ChecksumType response_checksum_type() const { return 
_response_checksum_type; }
     const HttpHeader& http_request() const 
     { return _http_request != NULL ? *_http_request : DefaultHttpHeader(); }
     
@@ -693,6 +701,7 @@ private:
         int32_t tos;
         ConnectionType connection_type;         
         CompressType request_compress_type;
+        ChecksumType request_checksum_type;
         uint64_t log_id;
         bool has_request_code;
         int64_t request_code;
@@ -834,6 +843,9 @@ private:
     int _preferred_index;
     CompressType _request_compress_type;
     CompressType _response_compress_type;
+    ChecksumType _request_checksum_type;
+    ChecksumType _response_checksum_type;
+    std::string _checksum_value;
     Inheritable _inheritable;
     int _pchan_sub_count;
     google::protobuf::Message* _response;
diff --git a/src/brpc/details/controller_private_accessor.h 
b/src/brpc/details/controller_private_accessor.h
index db40ca15..1a9d7062 100644
--- a/src/brpc/details/controller_private_accessor.h
+++ b/src/brpc/details/controller_private_accessor.h
@@ -152,6 +152,16 @@ public:
         return *this;
     }
 
+    void set_checksum_value(const char* c, size_t size) {
+        _cntl->_checksum_value.assign(c, size);
+    }
+
+    void set_checksum_value(const std::string& c) {
+        _cntl->_checksum_value = c;
+    }
+
+    const std::string& checksum_value() const { return _cntl->_checksum_value; 
}
+
 private:
     Controller* _cntl;
 };
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index 6b3310ec..0196b6d0 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -60,6 +60,10 @@
 #include "brpc/policy/gzip_compress.h"
 #include "brpc/policy/snappy_compress.h"
 
+// Checksum handlers
+#include "brpc/checksum.h"
+#include "brpc/policy/crc32c_checksum.h"
+
 // Protocols
 #include "brpc/protocol.h"
 #include "brpc/policy/baidu_rpc_protocol.h"
@@ -401,6 +405,13 @@ static void GlobalInitializeOrDieImpl() {
         exit(1);
     }
 
+    // Checksum Handlers
+    const ChecksumHandler crc32c_checksum = {Crc32cCompute, Crc32cVerify,
+                                             "crc32c"};
+    if (RegisterChecksumHandler(CHECKSUM_TYPE_CRC32C, crc32c_checksum) != 0) {
+        exit(1);
+    }
+
     // Protocols
     Protocol baidu_protocol = { ParseRpcMessage,
                                 SerializeRpcRequest, PackRpcRequest,
diff --git a/src/brpc/options.proto b/src/brpc/options.proto
index e334c48e..34001d7b 100644
--- a/src/brpc/options.proto
+++ b/src/brpc/options.proto
@@ -74,6 +74,11 @@ enum CompressType {
     COMPRESS_TYPE_LZ4 = 4;
 }
 
+enum ChecksumType {
+    CHECKSUM_TYPE_NONE = 0;
+    CHECKSUM_TYPE_CRC32C = 1;
+}
+
 enum ContentType {
     CONTENT_TYPE_PB = 0;
     CONTENT_TYPE_JSON = 1;
diff --git a/src/brpc/policy/baidu_rpc_meta.proto 
b/src/brpc/policy/baidu_rpc_meta.proto
index 59179831..5591c5da 100644
--- a/src/brpc/policy/baidu_rpc_meta.proto
+++ b/src/brpc/policy/baidu_rpc_meta.proto
@@ -34,6 +34,8 @@ message RpcMeta {
     optional StreamSettings stream_settings = 8;
     map<string, string> user_fields = 9;
     optional ContentType content_type = 10;
+    optional int32 checksum_type = 11;
+    optional bytes checksum_value = 12;
 }
 
 message RpcRequestMeta {
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp 
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 8efff065..fcc8b824 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -32,6 +32,7 @@
 #include "brpc/server.h"                        // Server
 #include "brpc/span.h"
 #include "brpc/compress.h"                      // ParseFromCompressedData
+#include "brpc/checksum.h"
 #include "brpc/stream_impl.h"
 #include "brpc/rpc_dump.h"                      // SampledRequest
 #include "brpc/rpc_pb_message_factory.h"
@@ -143,7 +144,8 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* 
socket,
 
 bool SerializeRpcMessage(const google::protobuf::Message& message,
                          Controller& cntl, ContentType content_type,
-                         CompressType compress_type, butil::IOBuf* buf) {
+                         CompressType compress_type, ChecksumType 
checksum_type,
+                         butil::IOBuf* buf) {
     auto serialize = [&](Serializer& serializer) -> bool {
         bool ok;
         if (COMPRESS_TYPE_NONE == compress_type) {
@@ -156,6 +158,8 @@ bool SerializeRpcMessage(const google::protobuf::Message& 
message,
             }
             ok = handler->Compress(serializer, buf);
         }
+        ChecksumIn checksum_in{buf, &cntl};
+        ComputeDataChecksum(checksum_in, checksum_type);
         return ok;
     };
 
@@ -223,13 +227,16 @@ static bool SerializeResponse(const 
google::protobuf::Message& res,
 
     ContentType content_type = cntl.response_content_type();
     CompressType compress_type = cntl.response_compress_type();
-    if (!SerializeRpcMessage(res, cntl, content_type, compress_type, &buf)) {
-        cntl.SetFailed(
-            ERESPONSE, "Fail to serialize response=%s, "
-                       "ContentType=%s, CompressType=%s",
-            res.GetDescriptor()->full_name().c_str(),
-            ContentTypeToCStr(content_type),
-            CompressTypeToCStr(compress_type));
+    ChecksumType checksum_type = cntl.response_checksum_type();
+    if (!SerializeRpcMessage(res, cntl, content_type, compress_type,
+                             checksum_type, &buf)) {
+        cntl.SetFailed(ERESPONSE,
+                       "Fail to serialize response=%s, "
+                       "ContentType=%s, CompressType=%s, ChecksumType=%s",
+                       res.GetDescriptor()->full_name().c_str(),
+                       ContentTypeToCStr(content_type),
+                       CompressTypeToCStr(compress_type),
+                       ChecksumTypeToCStr(checksum_type));
         return false;
     }
     return true;
@@ -308,7 +315,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* 
cntl,
     // `res' can be NULL here, in which case we don't serialize it
     // If user calls `SetFailed' on Controller, we don't serialize
     // response either
-    CompressType compress_type = cntl->response_compress_type();
     if (res != NULL && !cntl->Failed()) {
         append_body = SerializeResponse(*res, *cntl, res_body);
     }
@@ -336,8 +342,10 @@ void SendRpcResponse(int64_t correlation_id, Controller* 
cntl,
         response_meta->set_error_text(cntl->ErrorText());
     }
     meta.set_correlation_id(correlation_id);
-    meta.set_compress_type(compress_type);
+    meta.set_compress_type(cntl->response_compress_type());
     meta.set_content_type(cntl->response_content_type());
+    meta.set_checksum_type(cntl->response_checksum_type());
+    meta.set_checksum_value(accessor.checksum_value());
     if (attached_size > 0) {
         meta.set_attachment_size(attached_size);
     }
@@ -486,9 +494,14 @@ void EndRunningCallMethodInPool(
 
 bool DeserializeRpcMessage(const butil::IOBuf& data, Controller& cntl,
                            ContentType content_type, CompressType 
compress_type,
+                           ChecksumType checksum_type,
                            google::protobuf::Message* message) {
     auto deserialize = [&](Deserializer& deserializer) -> bool {
-        bool ok;
+        ChecksumIn checksum_in{&data, &cntl};
+        bool ok = VerifyDataChecksum(checksum_in, checksum_type);
+        if (!ok) {
+            return ok;
+        }
         if (COMPRESS_TYPE_NONE == compress_type) {
             butil::IOBufAsZeroCopyInputStream stream(data);
             ok = deserializer.DeserializeFrom(&stream);
@@ -601,6 +614,8 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
     }
     cntl->set_request_content_type(meta.content_type());
     cntl->set_request_compress_type((CompressType)meta.compress_type());
+    cntl->set_request_checksum_type((ChecksumType)meta.checksum_type());
+    accessor.set_checksum_value(meta.checksum_value());
     accessor.set_server(server)
         .set_security_mode(security_mode)
         .set_peer_id(socket->id())
@@ -783,16 +798,23 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
             }
 
             ContentType content_type = meta.content_type();
-            auto compress_type = 
static_cast<CompressType>(meta.compress_type());
-            messages = server->options().rpc_pb_message_factory->Get(*svc, 
*method);
+            auto compress_type =
+                static_cast<CompressType>(meta.compress_type());
+            auto checksum_type =
+                static_cast<ChecksumType>(meta.checksum_type());
+            messages =
+                server->options().rpc_pb_message_factory->Get(*svc, *method);
             if (!DeserializeRpcMessage(req_buf, *cntl, content_type,
-                                       compress_type, messages->Request())) {
+                                       compress_type, checksum_type,
+                                       messages->Request())) {
                 cntl->SetFailed(
-                    EREQUEST, "Fail to parse request=%s, ContentType=%s, "
-                              "CompressType=%s, request_size=%d",
+                    EREQUEST,
+                    "Fail to parse request=%s, ContentType=%s, "
+                    "CompressType=%s, ChecksumType=%s, request_size=%d",
                     messages->Request()->GetDescriptor()->full_name().c_str(),
                     ContentTypeToCStr(content_type),
-                    CompressTypeToCStr(compress_type), req_size);
+                    CompressTypeToCStr(compress_type),
+                    ChecksumTypeToCStr(checksum_type), req_size);
                 break;
             }
             req_buf.clear();
@@ -956,20 +978,26 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
 
         ContentType content_type = meta.content_type();
         auto compress_type = (CompressType)meta.compress_type();
+        auto checksum_type = (ChecksumType)meta.checksum_type();
         cntl->set_response_content_type(content_type);
         cntl->set_response_compress_type(compress_type);
+        cntl->set_response_checksum_type(checksum_type);
+        accessor.set_checksum_value(meta.checksum_value());
         if (cntl->response()) {
             if (cntl->response()->GetDescriptor() == 
SerializedResponse::descriptor()) {
                 ((SerializedResponse*)cntl->response())->
                     serialized_data().append(*res_buf_ptr);
             } else if (!DeserializeRpcMessage(*res_buf_ptr, *cntl, 
content_type,
-                                              compress_type, 
cntl->response())) {
+                                              compress_type, checksum_type,
+                                              cntl->response())) {
                 cntl->SetFailed(
-                    EREQUEST, "Fail to parse response=%s, ContentType=%s, "
-                              "CompressType=%s, request_size=%d",
+                    EREQUEST,
+                    "Fail to parse response=%s, ContentType=%s, "
+                    "CompressType=%s, ChecksumType=%s, request_size=%d",
                     cntl->response()->GetDescriptor()->full_name().c_str(),
                     ContentTypeToCStr(content_type),
-                    CompressTypeToCStr(compress_type), res_size);
+                    CompressTypeToCStr(compress_type),
+                    ChecksumTypeToCStr(checksum_type), res_size);
             }
         } // else silently ignore the response.
     } while (0);
@@ -996,13 +1024,16 @@ void SerializeRpcRequest(butil::IOBuf* request_buf, 
Controller* cntl,
 
     ContentType content_type = cntl->request_content_type();
     CompressType compress_type = cntl->request_compress_type();
-    if (!SerializeRpcMessage(*request, *cntl, content_type, compress_type, 
request_buf)) {
+    ChecksumType checksum_type = cntl->request_checksum_type();
+    if (!SerializeRpcMessage(*request, *cntl, content_type, compress_type,
+                             checksum_type, request_buf)) {
         return cntl->SetFailed(
-            EREQUEST, "Fail to compress request=%s, "
-                      "ContentType=%s, CompressType=%s",
+            EREQUEST,
+            "Fail to compress request=%s, "
+            "ContentType=%s, CompressType=%s, ChecksumType=%s",
             request->GetDescriptor()->full_name().c_str(),
-            ContentTypeToCStr(content_type),
-            CompressTypeToCStr(compress_type));
+            ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type),
+            ChecksumTypeToCStr(checksum_type));
     }
 }
 
@@ -1027,6 +1058,8 @@ void PackRpcRequest(butil::IOBuf* req_buf,
                                        method->service()->name());
         request_meta->set_method_name(method->name());
         meta.set_compress_type(cntl->request_compress_type());
+        meta.set_checksum_type(cntl->request_checksum_type());
+        meta.set_checksum_value(accessor.checksum_value());
     } else if (NULL != cntl->sampled_request()) {
         // Replaying. Keep service-name as the one seen by server.
         
request_meta->set_service_name(cntl->sampled_request()->meta.service_name());
diff --git a/src/brpc/policy/crc32c_checksum.cpp 
b/src/brpc/policy/crc32c_checksum.cpp
new file mode 100644
index 00000000..28b6fab6
--- /dev/null
+++ b/src/brpc/policy/crc32c_checksum.cpp
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/crc32c_checksum.h"
+
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/log.h"
+#include "butil/crc32c.h"
+#include "butil/sys_byteorder.h"
+
+namespace brpc {
+namespace policy {
+
+void Crc32cCompute(const ChecksumIn& in) {
+    auto buf = in.buf;
+    auto cntl = in.cntl;
+    butil::IOBufAsZeroCopyInputStream wrapper(*buf);
+    const void* data;
+    int size;
+    uint32_t crc = 0;
+    while (wrapper.Next(&data, &size)) {
+        crc = butil::crc32c::Extend(crc, static_cast<const char*>(data), size);
+    }
+    RPC_VLOG << "Crc32cCompute crc=" << crc;
+    crc = butil::HostToNet32(butil::crc32c::Mask(crc));
+    ControllerPrivateAccessor(cntl).set_checksum_value(
+        reinterpret_cast<char*>(&crc), sizeof(crc));
+}
+
+bool Crc32cVerify(const ChecksumIn& in) {
+    auto buf = in.buf;
+    auto cntl = in.cntl;
+    butil::IOBufAsZeroCopyInputStream wrapper(*buf);
+    const void* data;
+    int size;
+    uint32_t crc = 0;
+    while (wrapper.Next(&data, &size)) {
+        crc = butil::crc32c::Extend(crc, static_cast<const char*>(data), size);
+    }
+    auto& val = ControllerPrivateAccessor(const_cast<Controller*>(cntl))
+                    .checksum_value();
+    CHECK_EQ(val.size(), sizeof(crc));
+    auto expected = *reinterpret_cast<const uint32_t*>(val.data());
+    expected = butil::crc32c::Unmask(butil::NetToHost32(expected));
+    RPC_VLOG << "Crc32cVerify crc=" << crc << " expected=" << expected;
+    return crc == expected;
+}
+
+}  // namespace policy
+}  // namespace brpc
diff --git a/src/brpc/policy/crc32c_checksum.h 
b/src/brpc/policy/crc32c_checksum.h
new file mode 100644
index 00000000..cfdd724c
--- /dev/null
+++ b/src/brpc/policy/crc32c_checksum.h
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_POLICY_CRC32C_CHECKSUM_H
+#define BRPC_POLICY_CRC32C_CHECKSUM_H
+
+#include "brpc/checksum.h"
+#include "brpc/controller.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+
+namespace brpc {
+namespace policy {
+
+// Compute checksum
+void Crc32cCompute(const ChecksumIn& in);
+
+// Verify checksum
+bool Crc32cVerify(const ChecksumIn& in);
+
+}  // namespace policy
+}  // namespace brpc
+
+#endif  // BRPC_POLICY_CRC32C_CHECKSUM_H
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 17cde630..cd83053a 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -33,6 +33,7 @@
 #include "butil/debug/leak_annotations.h"
 #include "brpc/log.h"
 #include "brpc/compress.h"
+#include "brpc/checksum.h"
 #include "brpc/policy/nova_pbrpc_protocol.h"
 #include "brpc/global.h"
 #include "brpc/socket_map.h"                   // SocketMapList
@@ -227,6 +228,17 @@ static void PrintSupportedCompressions(std::ostream& os, 
void*) {
     }
 }
 
+static void PrintSupportedChecksums(std::ostream& os, void*) {
+    std::vector<ChecksumHandler> handlers;
+    ListChecksumHandler(&handlers);
+    for (size_t i = 0; i < handlers.size(); ++i) {
+        if (i != 0) {
+            os << ' ';
+        }
+        os << (handlers[i].name ? handlers[i].name : "(null)");
+    }
+}
+
 static void PrintEnabledProfilers(std::ostream& os, void*) {
     if (cpu_profiler_enabled) {
         os << "cpu ";
@@ -253,6 +265,9 @@ static bvar::PassiveStatus<std::string> s_proto_st(
 static bvar::PassiveStatus<std::string> s_comp_st(
     "rpc_compressions", PrintSupportedCompressions, NULL);
 
+static bvar::PassiveStatus<std::string> s_cksum_st(
+    "rpc_checksums", PrintSupportedChecksums, NULL);
+
 static bvar::PassiveStatus<std::string> s_prof_st(
     "rpc_profilers", PrintEnabledProfilers, NULL);
 
diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp
index a51b9317..4a774fab 100644
--- a/test/brpc_server_unittest.cpp
+++ b/test/brpc_server_unittest.cpp
@@ -71,11 +71,14 @@ DECLARE_bool(enable_dir_service);
 namespace policy {
 DECLARE_bool(use_http_error_code);
 
-extern bool SerializeRpcMessage(const google::protobuf::Message& serializer, 
Controller& cntl,
-                                ContentType content_type, CompressType 
compress_type,
-                                butil::IOBuf* buf);
-extern bool DeserializeRpcMessage(const butil::IOBuf& deserializer, 
Controller& cntl,
-                                  ContentType content_type, CompressType 
compress_type,
+extern bool SerializeRpcMessage(const google::protobuf::Message& serializer,
+                                Controller& cntl, ContentType content_type,
+                                CompressType compress_type,
+                                ChecksumType checksum_type, butil::IOBuf* buf);
+extern bool DeserializeRpcMessage(const butil::IOBuf& deserializer,
+                                  Controller& cntl, ContentType content_type,
+                                  CompressType compress_type,
+                                  ChecksumType checksum_type,
                                   google::protobuf::Message* message);
 }
 
@@ -1702,11 +1705,13 @@ public:
         ASSERT_EQ("Echo", cntl->sampled_request()->meta.method_name());
         brpc::ContentType content_type = cntl->request_content_type();
         brpc::CompressType compress_type = cntl->request_compress_type();
+        brpc::ChecksumType checksum_type = cntl->request_checksum_type();
 
         test::EchoRequest echo_request;
         test::EchoResponse echo_response;
         ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(
-            request->serialized_data(), *cntl, content_type, compress_type, 
&echo_request));
+            request->serialized_data(), *cntl, content_type, compress_type,
+            checksum_type, &echo_request));
         ASSERT_EQ(EXP_REQUEST, echo_request.message());
         ASSERT_EQ(EXP_REQUEST, cntl->request_attachment().to_string());
 
@@ -1727,10 +1732,12 @@ public:
 
         cntl->set_response_content_type(content_type);
         cntl->set_response_compress_type(compress_type);
+        cntl->set_response_checksum_type(checksum_type);
         cntl->response_attachment().append(EXP_RESPONSE);
         echo_response.set_message(EXP_RESPONSE);
         ASSERT_TRUE(brpc::policy::SerializeRpcMessage(
-            echo_response, *cntl, content_type, compress_type, 
&response->serialized_data()));
+            echo_response, *cntl, content_type, compress_type, checksum_type,
+            &response->serialized_data()));
     }
 private:
     int _content_type_index = brpc::ContentType_MIN;
@@ -1777,11 +1784,12 @@ TEST_F(ServerTest, baidu_master_service) {
     ASSERT_EQ(0, server.Join());
 }
 
-void TestGenericCall(brpc::Channel& channel,
-                     brpc::ContentType content_type,
-                     brpc::CompressType compress_type) {
+void TestGenericCall(brpc::Channel& channel, brpc::ContentType content_type,
+                     brpc::CompressType compress_type,
+                     brpc::ChecksumType checksum_type) {
     LOG(INFO) << "TestGenericCall: content_type=" << content_type
-              << ", compress_type=" << compress_type;
+              << ", compress_type=" << compress_type
+              << ", checksum_type=" << checksum_type;
     test::EchoRequest request;
     test::EchoResponse response;
     request.set_message(EXP_REQUEST);
@@ -1792,11 +1800,13 @@ void TestGenericCall(brpc::Channel& channel,
     brpc::Controller cntl;
     cntl.set_request_content_type(content_type);
     cntl.set_request_compress_type(compress_type);
+    cntl.set_request_checksum_type(checksum_type);
     cntl.request_attachment().append(EXP_REQUEST);
 
     std::string error;
     ASSERT_TRUE(brpc::policy::SerializeRpcMessage(
-        request, cntl, content_type, compress_type, 
&serialized_request.serialized_data()));
+        request, cntl, content_type, compress_type, checksum_type,
+        &serialized_request.serialized_data()));
     auto sampled_request = new (std::nothrow) brpc::SampledRequest();
     sampled_request->meta.set_service_name(
         test::EchoService::descriptor()->full_name());
@@ -1807,9 +1817,10 @@ void TestGenericCall(brpc::Channel& channel,
     channel.CallMethod(NULL, &cntl, &serialized_request, &serialized_response, 
NULL);
     ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
 
-    
ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(serialized_response.serialized_data(),
-                                                    cntl, 
cntl.response_content_type(),
-                                                    
cntl.response_compress_type(), &response));
+    ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(
+        serialized_response.serialized_data(), cntl,
+        cntl.response_content_type(), cntl.response_compress_type(),
+        cntl.response_checksum_type(), &response));
     ASSERT_EQ(EXP_RESPONSE, response.message());
     ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string());
 }
@@ -1829,25 +1840,41 @@ TEST_F(ServerTest, generic_call) {
     channel_options.protocol = "baidu_std";
     ASSERT_EQ(0, channel.Init(ep, &channel_options));
 
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_ZLIB);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_GZIP);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PB, 
brpc::COMPRESS_TYPE_SNAPPY);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_NONE);
-
-    TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, 
brpc::COMPRESS_TYPE_ZLIB);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, 
brpc::COMPRESS_TYPE_GZIP);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, 
brpc::COMPRESS_TYPE_SNAPPY);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, 
brpc::COMPRESS_TYPE_NONE);
-
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, 
brpc::COMPRESS_TYPE_ZLIB);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, 
brpc::COMPRESS_TYPE_GZIP);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, 
brpc::COMPRESS_TYPE_SNAPPY);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON, 
brpc::COMPRESS_TYPE_NONE);
-
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, 
brpc::COMPRESS_TYPE_ZLIB);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, 
brpc::COMPRESS_TYPE_GZIP);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, 
brpc::COMPRESS_TYPE_SNAPPY);
-    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT, 
brpc::COMPRESS_TYPE_NONE);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_ZLIB,
+                    brpc::CHECKSUM_TYPE_CRC32C);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_GZIP,
+                    brpc::CHECKSUM_TYPE_CRC32C);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_SNAPPY,
+                    brpc::CHECKSUM_TYPE_NONE);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_NONE,
+                    brpc::CHECKSUM_TYPE_NONE);
+
+    TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_ZLIB,
+                    brpc::CHECKSUM_TYPE_CRC32C);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_GZIP,
+                    brpc::CHECKSUM_TYPE_CRC32C);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_JSON,
+                    brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_NONE,
+                    brpc::CHECKSUM_TYPE_NONE);
+
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
+                    brpc::COMPRESS_TYPE_ZLIB, brpc::CHECKSUM_TYPE_CRC32C);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
+                    brpc::COMPRESS_TYPE_GZIP, brpc::CHECKSUM_TYPE_CRC32C);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
+                    brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
+                    brpc::COMPRESS_TYPE_NONE, brpc::CHECKSUM_TYPE_NONE);
+
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
+                    brpc::COMPRESS_TYPE_ZLIB, brpc::CHECKSUM_TYPE_CRC32C);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
+                    brpc::COMPRESS_TYPE_GZIP, brpc::CHECKSUM_TYPE_CRC32C);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
+                    brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE);
+    TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
+                    brpc::COMPRESS_TYPE_NONE, brpc::CHECKSUM_TYPE_NONE);
 
     ASSERT_EQ(0, server.Stop(0));
     ASSERT_EQ(0, server.Join());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to