This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 663625a7 support checksum (#2967)
663625a7 is described below
commit 663625a73b92b37dcaef1e55e73e99d4b136388a
Author: Yang,Liming <[email protected]>
AuthorDate: Fri Jun 20 14:16:35 2025 +0800
support checksum (#2967)
---
example/echo_c++/client.cpp | 6 ++
example/echo_c++/server.cpp | 6 ++
src/brpc/checksum.cpp | 100 +++++++++++++++++++++++++
src/brpc/checksum.h | 63 ++++++++++++++++
src/brpc/controller.cpp | 4 +
src/brpc/controller.h | 12 +++
src/brpc/details/controller_private_accessor.h | 10 +++
src/brpc/global.cpp | 11 +++
src/brpc/options.proto | 5 ++
src/brpc/policy/baidu_rpc_meta.proto | 2 +
src/brpc/policy/baidu_rpc_protocol.cpp | 85 ++++++++++++++-------
src/brpc/policy/crc32c_checksum.cpp | 64 ++++++++++++++++
src/brpc/policy/crc32c_checksum.h | 37 +++++++++
src/brpc/server.cpp | 15 ++++
test/brpc_server_unittest.cpp | 95 ++++++++++++++---------
15 files changed, 455 insertions(+), 60 deletions(-)
diff --git a/example/echo_c++/client.cpp b/example/echo_c++/client.cpp
index 3cc83f72..4a3aee9c 100644
--- a/example/echo_c++/client.cpp
+++ b/example/echo_c++/client.cpp
@@ -31,6 +31,7 @@ DEFINE_string(load_balancer, "", "The algorithm for load
balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
+DEFINE_bool(enable_checksum, false, "Enable checksum or not");
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
@@ -71,6 +72,11 @@ int main(int argc, char* argv[]) {
// being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);
+ // Use checksum, only support CRC32C now.
+ if (FLAGS_enable_checksum) {
+ cntl.set_request_checksum_type(brpc::CHECKSUM_TYPE_CRC32C);
+ }
+
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
stub.Echo(&cntl, &request, &response, NULL);
diff --git a/example/echo_c++/server.cpp b/example/echo_c++/server.cpp
index cc0050ae..41131146 100644
--- a/example/echo_c++/server.cpp
+++ b/example/echo_c++/server.cpp
@@ -29,6 +29,7 @@ DEFINE_string(listen_addr, "", "Server listen address, may be
IPV4/IPV6/UDS."
" If this is set, the flag port will be ignored");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
+DEFINE_bool(enable_checksum, false, "Enable checksum or not");
// Your implementation of example::EchoService
// Notice that implementing brpc::Describable grants the ability to put
@@ -75,6 +76,11 @@ public:
// being serialized into protobuf messages.
cntl->response_attachment().append(cntl->request_attachment());
}
+
+ // Use checksum, only support CRC32C now.
+ if (FLAGS_enable_checksum) {
+ cntl->set_response_checksum_type(brpc::CHECKSUM_TYPE_CRC32C);
+ }
}
// optional
diff --git a/src/brpc/checksum.cpp b/src/brpc/checksum.cpp
new file mode 100644
index 00000000..85d04d85
--- /dev/null
+++ b/src/brpc/checksum.cpp
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/checksum.h"
+
+#include "brpc/protocol.h"
+#include "butil/logging.h"
+
+namespace brpc {
+
+static const int MAX_HANDLER_SIZE = 1024;
+static ChecksumHandler s_handler_map[MAX_HANDLER_SIZE] = {{NULL, NULL, NULL}};
+
+int RegisterChecksumHandler(ChecksumType type, ChecksumHandler handler) {
+ if (NULL == handler.Compute) {
+ LOG(FATAL) << "Invalid parameter: handler function is NULL";
+ return -1;
+ }
+ int index = type;
+ if (index < 0 || index >= MAX_HANDLER_SIZE) {
+ LOG(FATAL) << "ChecksumType=" << type << " is out of range";
+ return -1;
+ }
+ if (s_handler_map[index].Compute != NULL) {
+ LOG(FATAL) << "ChecksumType=" << type << " was registered";
+ return -1;
+ }
+ s_handler_map[index] = handler;
+ return 0;
+}
+
+// Find ChecksumHandler by type.
+// Returns NULL if not found
+inline const ChecksumHandler* FindChecksumHandler(ChecksumType type) {
+ int index = type;
+ if (index < 0 || index >= MAX_HANDLER_SIZE) {
+ LOG(ERROR) << "ChecksumType=" << type << " is out of range";
+ return NULL;
+ }
+ if (NULL == s_handler_map[index].Compute) {
+ return NULL;
+ }
+ return &s_handler_map[index];
+}
+
+const char* ChecksumTypeToCStr(ChecksumType type) {
+ if (type == CHECKSUM_TYPE_NONE) {
+ return "none";
+ }
+ const ChecksumHandler* handler = FindChecksumHandler(type);
+ return (handler != NULL ? handler->name : "unknown");
+}
+
+void ListChecksumHandler(std::vector<ChecksumHandler>* vec) {
+ vec->clear();
+ for (int i = 0; i < MAX_HANDLER_SIZE; ++i) {
+ if (s_handler_map[i].Compute != NULL) {
+ vec->push_back(s_handler_map[i]);
+ }
+ }
+}
+
+// Compute `data' checksum
+void ComputeDataChecksum(const ChecksumIn& in, ChecksumType checksum_type) {
+ if (checksum_type == CHECKSUM_TYPE_NONE) {
+ return;
+ }
+ const ChecksumHandler* handler = FindChecksumHandler(checksum_type);
+ if (NULL != handler) {
+ handler->Compute(in);
+ }
+}
+
+// Verify `data' checksum Returns true on success, false otherwise
+bool VerifyDataChecksum(const ChecksumIn& in, ChecksumType checksum_type) {
+ if (checksum_type == CHECKSUM_TYPE_NONE) {
+ return true;
+ }
+ const ChecksumHandler* handler = FindChecksumHandler(checksum_type);
+ if (NULL != handler) {
+ return handler->Verify(in);
+ }
+ return true;
+}
+
+} // namespace brpc
diff --git a/src/brpc/checksum.h b/src/brpc/checksum.h
new file mode 100644
index 00000000..ff0a98a8
--- /dev/null
+++ b/src/brpc/checksum.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_CHECKSUM_H
+#define BRPC_CHECKSUM_H
+
+#include "brpc/controller.h"
+#include "brpc/options.pb.h" // ChecksumType
+#include "butil/iobuf.h" // butil::IOBuf
+
+namespace brpc {
+
+struct ChecksumIn {
+ const butil::IOBuf* buf;
+ Controller* cntl;
+};
+
+struct ChecksumHandler {
+ // checksum `buf'.
+ // Returns checksum value
+ void (*Compute)(const ChecksumIn& in);
+
+ // verify buf checksum
+ // Rerturn true on success, false otherwise
+ bool (*Verify)(const ChecksumIn& in);
+
+ // Name of the checksum algorithm, must be string constant.
+ const char* name;
+};
+
+// [NOT thread-safe] Register `handler' using key=`type'
+// Returns 0 on success, -1 otherwise
+int RegisterChecksumHandler(ChecksumType type, ChecksumHandler handler);
+
+// Returns the `name' of the checksumType if registered
+const char* ChecksumTypeToCStr(ChecksumType type);
+
+// Put all registered handlers into `vec'.
+void ListChecksumHandler(std::vector<ChecksumHandler>* vec);
+
+// Compute `data' checksum and set to controller
+void ComputeDataChecksum(const ChecksumIn& in, ChecksumType checksum_type);
+
+// Verify `data' checksum Returns true on success, false otherwise
+bool VerifyDataChecksum(const ChecksumIn& in, ChecksumType checksum_type);
+
+} // namespace brpc
+
+#endif // BRPC_CHECKSUM_H
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 1362d322..b16ab545 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -269,6 +269,8 @@ void Controller::ResetPods() {
_preferred_index = -1;
_request_compress_type = COMPRESS_TYPE_NONE;
_response_compress_type = COMPRESS_TYPE_NONE;
+ _request_checksum_type = CHECKSUM_TYPE_NONE;
+ _response_checksum_type = CHECKSUM_TYPE_NONE;
_fail_limit = UNSET_MAGIC_NUM;
_pipelined_count = 0;
_inheritable.Reset();
@@ -1346,6 +1348,7 @@ void Controller::SaveClientSettings(ClientSettings* s)
const {
s->tos = _tos;
s->connection_type = _connection_type;
s->request_compress_type = _request_compress_type;
+ s->request_checksum_type = _request_checksum_type;
s->log_id = log_id();
s->has_request_code = has_request_code();
s->request_code = _request_code;
@@ -1359,6 +1362,7 @@ void Controller::ApplyClientSettings(const
ClientSettings& s) {
set_type_of_service(s.tos);
set_connection_type(s.connection_type);
set_request_compress_type(s.request_compress_type);
+ set_request_checksum_type(s.request_checksum_type);
set_log_id(s.log_id);
set_flag(FLAGS_REQUEST_CODE, s.has_request_code);
_request_code = s.request_code;
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 000aee2f..64474419 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -241,6 +241,9 @@ public:
// Set compression method for request.
void set_request_compress_type(CompressType t) { _request_compress_type =
t; }
+ // Set checksum type for request.
+ void set_request_checksum_type(ChecksumType t) { _request_checksum_type =
t; }
+
// Required by some load balancers.
void set_request_code(uint64_t request_code) {
add_flag(FLAGS_REQUEST_CODE);
@@ -464,6 +467,9 @@ public:
// Set compression method for response.
void set_response_compress_type(CompressType t) { _response_compress_type
= t; }
+
+ // Set checksum type for response.
+ void set_response_checksum_type(ChecksumType t) { _response_checksum_type
= t; }
// Non-zero when this RPC call is traced (by rpcz or rig).
// NOTE: Only valid at server-side, always zero at client-side.
@@ -552,6 +558,8 @@ public:
const std::string& request_id() const { return _inheritable.request_id; }
CompressType request_compress_type() const { return
_request_compress_type; }
CompressType response_compress_type() const { return
_response_compress_type; }
+ ChecksumType request_checksum_type() const { return
_request_checksum_type; }
+ ChecksumType response_checksum_type() const { return
_response_checksum_type; }
const HttpHeader& http_request() const
{ return _http_request != NULL ? *_http_request : DefaultHttpHeader(); }
@@ -693,6 +701,7 @@ private:
int32_t tos;
ConnectionType connection_type;
CompressType request_compress_type;
+ ChecksumType request_checksum_type;
uint64_t log_id;
bool has_request_code;
int64_t request_code;
@@ -834,6 +843,9 @@ private:
int _preferred_index;
CompressType _request_compress_type;
CompressType _response_compress_type;
+ ChecksumType _request_checksum_type;
+ ChecksumType _response_checksum_type;
+ std::string _checksum_value;
Inheritable _inheritable;
int _pchan_sub_count;
google::protobuf::Message* _response;
diff --git a/src/brpc/details/controller_private_accessor.h
b/src/brpc/details/controller_private_accessor.h
index db40ca15..1a9d7062 100644
--- a/src/brpc/details/controller_private_accessor.h
+++ b/src/brpc/details/controller_private_accessor.h
@@ -152,6 +152,16 @@ public:
return *this;
}
+ void set_checksum_value(const char* c, size_t size) {
+ _cntl->_checksum_value.assign(c, size);
+ }
+
+ void set_checksum_value(const std::string& c) {
+ _cntl->_checksum_value = c;
+ }
+
+ const std::string& checksum_value() const { return _cntl->_checksum_value;
}
+
private:
Controller* _cntl;
};
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index 6b3310ec..0196b6d0 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -60,6 +60,10 @@
#include "brpc/policy/gzip_compress.h"
#include "brpc/policy/snappy_compress.h"
+// Checksum handlers
+#include "brpc/checksum.h"
+#include "brpc/policy/crc32c_checksum.h"
+
// Protocols
#include "brpc/protocol.h"
#include "brpc/policy/baidu_rpc_protocol.h"
@@ -401,6 +405,13 @@ static void GlobalInitializeOrDieImpl() {
exit(1);
}
+ // Checksum Handlers
+ const ChecksumHandler crc32c_checksum = {Crc32cCompute, Crc32cVerify,
+ "crc32c"};
+ if (RegisterChecksumHandler(CHECKSUM_TYPE_CRC32C, crc32c_checksum) != 0) {
+ exit(1);
+ }
+
// Protocols
Protocol baidu_protocol = { ParseRpcMessage,
SerializeRpcRequest, PackRpcRequest,
diff --git a/src/brpc/options.proto b/src/brpc/options.proto
index e334c48e..34001d7b 100644
--- a/src/brpc/options.proto
+++ b/src/brpc/options.proto
@@ -74,6 +74,11 @@ enum CompressType {
COMPRESS_TYPE_LZ4 = 4;
}
+enum ChecksumType {
+ CHECKSUM_TYPE_NONE = 0;
+ CHECKSUM_TYPE_CRC32C = 1;
+}
+
enum ContentType {
CONTENT_TYPE_PB = 0;
CONTENT_TYPE_JSON = 1;
diff --git a/src/brpc/policy/baidu_rpc_meta.proto
b/src/brpc/policy/baidu_rpc_meta.proto
index 59179831..5591c5da 100644
--- a/src/brpc/policy/baidu_rpc_meta.proto
+++ b/src/brpc/policy/baidu_rpc_meta.proto
@@ -34,6 +34,8 @@ message RpcMeta {
optional StreamSettings stream_settings = 8;
map<string, string> user_fields = 9;
optional ContentType content_type = 10;
+ optional int32 checksum_type = 11;
+ optional bytes checksum_value = 12;
}
message RpcRequestMeta {
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 8efff065..fcc8b824 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -32,6 +32,7 @@
#include "brpc/server.h" // Server
#include "brpc/span.h"
#include "brpc/compress.h" // ParseFromCompressedData
+#include "brpc/checksum.h"
#include "brpc/stream_impl.h"
#include "brpc/rpc_dump.h" // SampledRequest
#include "brpc/rpc_pb_message_factory.h"
@@ -143,7 +144,8 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket*
socket,
bool SerializeRpcMessage(const google::protobuf::Message& message,
Controller& cntl, ContentType content_type,
- CompressType compress_type, butil::IOBuf* buf) {
+ CompressType compress_type, ChecksumType
checksum_type,
+ butil::IOBuf* buf) {
auto serialize = [&](Serializer& serializer) -> bool {
bool ok;
if (COMPRESS_TYPE_NONE == compress_type) {
@@ -156,6 +158,8 @@ bool SerializeRpcMessage(const google::protobuf::Message&
message,
}
ok = handler->Compress(serializer, buf);
}
+ ChecksumIn checksum_in{buf, &cntl};
+ ComputeDataChecksum(checksum_in, checksum_type);
return ok;
};
@@ -223,13 +227,16 @@ static bool SerializeResponse(const
google::protobuf::Message& res,
ContentType content_type = cntl.response_content_type();
CompressType compress_type = cntl.response_compress_type();
- if (!SerializeRpcMessage(res, cntl, content_type, compress_type, &buf)) {
- cntl.SetFailed(
- ERESPONSE, "Fail to serialize response=%s, "
- "ContentType=%s, CompressType=%s",
- res.GetDescriptor()->full_name().c_str(),
- ContentTypeToCStr(content_type),
- CompressTypeToCStr(compress_type));
+ ChecksumType checksum_type = cntl.response_checksum_type();
+ if (!SerializeRpcMessage(res, cntl, content_type, compress_type,
+ checksum_type, &buf)) {
+ cntl.SetFailed(ERESPONSE,
+ "Fail to serialize response=%s, "
+ "ContentType=%s, CompressType=%s, ChecksumType=%s",
+ res.GetDescriptor()->full_name().c_str(),
+ ContentTypeToCStr(content_type),
+ CompressTypeToCStr(compress_type),
+ ChecksumTypeToCStr(checksum_type));
return false;
}
return true;
@@ -308,7 +315,6 @@ void SendRpcResponse(int64_t correlation_id, Controller*
cntl,
// `res' can be NULL here, in which case we don't serialize it
// If user calls `SetFailed' on Controller, we don't serialize
// response either
- CompressType compress_type = cntl->response_compress_type();
if (res != NULL && !cntl->Failed()) {
append_body = SerializeResponse(*res, *cntl, res_body);
}
@@ -336,8 +342,10 @@ void SendRpcResponse(int64_t correlation_id, Controller*
cntl,
response_meta->set_error_text(cntl->ErrorText());
}
meta.set_correlation_id(correlation_id);
- meta.set_compress_type(compress_type);
+ meta.set_compress_type(cntl->response_compress_type());
meta.set_content_type(cntl->response_content_type());
+ meta.set_checksum_type(cntl->response_checksum_type());
+ meta.set_checksum_value(accessor.checksum_value());
if (attached_size > 0) {
meta.set_attachment_size(attached_size);
}
@@ -486,9 +494,14 @@ void EndRunningCallMethodInPool(
bool DeserializeRpcMessage(const butil::IOBuf& data, Controller& cntl,
ContentType content_type, CompressType
compress_type,
+ ChecksumType checksum_type,
google::protobuf::Message* message) {
auto deserialize = [&](Deserializer& deserializer) -> bool {
- bool ok;
+ ChecksumIn checksum_in{&data, &cntl};
+ bool ok = VerifyDataChecksum(checksum_in, checksum_type);
+ if (!ok) {
+ return ok;
+ }
if (COMPRESS_TYPE_NONE == compress_type) {
butil::IOBufAsZeroCopyInputStream stream(data);
ok = deserializer.DeserializeFrom(&stream);
@@ -601,6 +614,8 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
}
cntl->set_request_content_type(meta.content_type());
cntl->set_request_compress_type((CompressType)meta.compress_type());
+ cntl->set_request_checksum_type((ChecksumType)meta.checksum_type());
+ accessor.set_checksum_value(meta.checksum_value());
accessor.set_server(server)
.set_security_mode(security_mode)
.set_peer_id(socket->id())
@@ -783,16 +798,23 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
}
ContentType content_type = meta.content_type();
- auto compress_type =
static_cast<CompressType>(meta.compress_type());
- messages = server->options().rpc_pb_message_factory->Get(*svc,
*method);
+ auto compress_type =
+ static_cast<CompressType>(meta.compress_type());
+ auto checksum_type =
+ static_cast<ChecksumType>(meta.checksum_type());
+ messages =
+ server->options().rpc_pb_message_factory->Get(*svc, *method);
if (!DeserializeRpcMessage(req_buf, *cntl, content_type,
- compress_type, messages->Request())) {
+ compress_type, checksum_type,
+ messages->Request())) {
cntl->SetFailed(
- EREQUEST, "Fail to parse request=%s, ContentType=%s, "
- "CompressType=%s, request_size=%d",
+ EREQUEST,
+ "Fail to parse request=%s, ContentType=%s, "
+ "CompressType=%s, ChecksumType=%s, request_size=%d",
messages->Request()->GetDescriptor()->full_name().c_str(),
ContentTypeToCStr(content_type),
- CompressTypeToCStr(compress_type), req_size);
+ CompressTypeToCStr(compress_type),
+ ChecksumTypeToCStr(checksum_type), req_size);
break;
}
req_buf.clear();
@@ -956,20 +978,26 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
ContentType content_type = meta.content_type();
auto compress_type = (CompressType)meta.compress_type();
+ auto checksum_type = (ChecksumType)meta.checksum_type();
cntl->set_response_content_type(content_type);
cntl->set_response_compress_type(compress_type);
+ cntl->set_response_checksum_type(checksum_type);
+ accessor.set_checksum_value(meta.checksum_value());
if (cntl->response()) {
if (cntl->response()->GetDescriptor() ==
SerializedResponse::descriptor()) {
((SerializedResponse*)cntl->response())->
serialized_data().append(*res_buf_ptr);
} else if (!DeserializeRpcMessage(*res_buf_ptr, *cntl,
content_type,
- compress_type,
cntl->response())) {
+ compress_type, checksum_type,
+ cntl->response())) {
cntl->SetFailed(
- EREQUEST, "Fail to parse response=%s, ContentType=%s, "
- "CompressType=%s, request_size=%d",
+ EREQUEST,
+ "Fail to parse response=%s, ContentType=%s, "
+ "CompressType=%s, ChecksumType=%s, request_size=%d",
cntl->response()->GetDescriptor()->full_name().c_str(),
ContentTypeToCStr(content_type),
- CompressTypeToCStr(compress_type), res_size);
+ CompressTypeToCStr(compress_type),
+ ChecksumTypeToCStr(checksum_type), res_size);
}
} // else silently ignore the response.
} while (0);
@@ -996,13 +1024,16 @@ void SerializeRpcRequest(butil::IOBuf* request_buf,
Controller* cntl,
ContentType content_type = cntl->request_content_type();
CompressType compress_type = cntl->request_compress_type();
- if (!SerializeRpcMessage(*request, *cntl, content_type, compress_type,
request_buf)) {
+ ChecksumType checksum_type = cntl->request_checksum_type();
+ if (!SerializeRpcMessage(*request, *cntl, content_type, compress_type,
+ checksum_type, request_buf)) {
return cntl->SetFailed(
- EREQUEST, "Fail to compress request=%s, "
- "ContentType=%s, CompressType=%s",
+ EREQUEST,
+ "Fail to compress request=%s, "
+ "ContentType=%s, CompressType=%s, ChecksumType=%s",
request->GetDescriptor()->full_name().c_str(),
- ContentTypeToCStr(content_type),
- CompressTypeToCStr(compress_type));
+ ContentTypeToCStr(content_type), CompressTypeToCStr(compress_type),
+ ChecksumTypeToCStr(checksum_type));
}
}
@@ -1027,6 +1058,8 @@ void PackRpcRequest(butil::IOBuf* req_buf,
method->service()->name());
request_meta->set_method_name(method->name());
meta.set_compress_type(cntl->request_compress_type());
+ meta.set_checksum_type(cntl->request_checksum_type());
+ meta.set_checksum_value(accessor.checksum_value());
} else if (NULL != cntl->sampled_request()) {
// Replaying. Keep service-name as the one seen by server.
request_meta->set_service_name(cntl->sampled_request()->meta.service_name());
diff --git a/src/brpc/policy/crc32c_checksum.cpp
b/src/brpc/policy/crc32c_checksum.cpp
new file mode 100644
index 00000000..28b6fab6
--- /dev/null
+++ b/src/brpc/policy/crc32c_checksum.cpp
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/crc32c_checksum.h"
+
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/log.h"
+#include "butil/crc32c.h"
+#include "butil/sys_byteorder.h"
+
+namespace brpc {
+namespace policy {
+
+void Crc32cCompute(const ChecksumIn& in) {
+ auto buf = in.buf;
+ auto cntl = in.cntl;
+ butil::IOBufAsZeroCopyInputStream wrapper(*buf);
+ const void* data;
+ int size;
+ uint32_t crc = 0;
+ while (wrapper.Next(&data, &size)) {
+ crc = butil::crc32c::Extend(crc, static_cast<const char*>(data), size);
+ }
+ RPC_VLOG << "Crc32cCompute crc=" << crc;
+ crc = butil::HostToNet32(butil::crc32c::Mask(crc));
+ ControllerPrivateAccessor(cntl).set_checksum_value(
+ reinterpret_cast<char*>(&crc), sizeof(crc));
+}
+
+bool Crc32cVerify(const ChecksumIn& in) {
+ auto buf = in.buf;
+ auto cntl = in.cntl;
+ butil::IOBufAsZeroCopyInputStream wrapper(*buf);
+ const void* data;
+ int size;
+ uint32_t crc = 0;
+ while (wrapper.Next(&data, &size)) {
+ crc = butil::crc32c::Extend(crc, static_cast<const char*>(data), size);
+ }
+ auto& val = ControllerPrivateAccessor(const_cast<Controller*>(cntl))
+ .checksum_value();
+ CHECK_EQ(val.size(), sizeof(crc));
+ auto expected = *reinterpret_cast<const uint32_t*>(val.data());
+ expected = butil::crc32c::Unmask(butil::NetToHost32(expected));
+ RPC_VLOG << "Crc32cVerify crc=" << crc << " expected=" << expected;
+ return crc == expected;
+}
+
+} // namespace policy
+} // namespace brpc
diff --git a/src/brpc/policy/crc32c_checksum.h
b/src/brpc/policy/crc32c_checksum.h
new file mode 100644
index 00000000..cfdd724c
--- /dev/null
+++ b/src/brpc/policy/crc32c_checksum.h
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_POLICY_CRC32C_CHECKSUM_H
+#define BRPC_POLICY_CRC32C_CHECKSUM_H
+
+#include "brpc/checksum.h"
+#include "brpc/controller.h"
+#include "butil/iobuf.h" // butil::IOBuf
+
+namespace brpc {
+namespace policy {
+
+// Compute checksum
+void Crc32cCompute(const ChecksumIn& in);
+
+// Verify checksum
+bool Crc32cVerify(const ChecksumIn& in);
+
+} // namespace policy
+} // namespace brpc
+
+#endif // BRPC_POLICY_CRC32C_CHECKSUM_H
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 17cde630..cd83053a 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -33,6 +33,7 @@
#include "butil/debug/leak_annotations.h"
#include "brpc/log.h"
#include "brpc/compress.h"
+#include "brpc/checksum.h"
#include "brpc/policy/nova_pbrpc_protocol.h"
#include "brpc/global.h"
#include "brpc/socket_map.h" // SocketMapList
@@ -227,6 +228,17 @@ static void PrintSupportedCompressions(std::ostream& os,
void*) {
}
}
+static void PrintSupportedChecksums(std::ostream& os, void*) {
+ std::vector<ChecksumHandler> handlers;
+ ListChecksumHandler(&handlers);
+ for (size_t i = 0; i < handlers.size(); ++i) {
+ if (i != 0) {
+ os << ' ';
+ }
+ os << (handlers[i].name ? handlers[i].name : "(null)");
+ }
+}
+
static void PrintEnabledProfilers(std::ostream& os, void*) {
if (cpu_profiler_enabled) {
os << "cpu ";
@@ -253,6 +265,9 @@ static bvar::PassiveStatus<std::string> s_proto_st(
static bvar::PassiveStatus<std::string> s_comp_st(
"rpc_compressions", PrintSupportedCompressions, NULL);
+static bvar::PassiveStatus<std::string> s_cksum_st(
+ "rpc_checksums", PrintSupportedChecksums, NULL);
+
static bvar::PassiveStatus<std::string> s_prof_st(
"rpc_profilers", PrintEnabledProfilers, NULL);
diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp
index a51b9317..4a774fab 100644
--- a/test/brpc_server_unittest.cpp
+++ b/test/brpc_server_unittest.cpp
@@ -71,11 +71,14 @@ DECLARE_bool(enable_dir_service);
namespace policy {
DECLARE_bool(use_http_error_code);
-extern bool SerializeRpcMessage(const google::protobuf::Message& serializer,
Controller& cntl,
- ContentType content_type, CompressType
compress_type,
- butil::IOBuf* buf);
-extern bool DeserializeRpcMessage(const butil::IOBuf& deserializer,
Controller& cntl,
- ContentType content_type, CompressType
compress_type,
+extern bool SerializeRpcMessage(const google::protobuf::Message& serializer,
+ Controller& cntl, ContentType content_type,
+ CompressType compress_type,
+ ChecksumType checksum_type, butil::IOBuf* buf);
+extern bool DeserializeRpcMessage(const butil::IOBuf& deserializer,
+ Controller& cntl, ContentType content_type,
+ CompressType compress_type,
+ ChecksumType checksum_type,
google::protobuf::Message* message);
}
@@ -1702,11 +1705,13 @@ public:
ASSERT_EQ("Echo", cntl->sampled_request()->meta.method_name());
brpc::ContentType content_type = cntl->request_content_type();
brpc::CompressType compress_type = cntl->request_compress_type();
+ brpc::ChecksumType checksum_type = cntl->request_checksum_type();
test::EchoRequest echo_request;
test::EchoResponse echo_response;
ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(
- request->serialized_data(), *cntl, content_type, compress_type,
&echo_request));
+ request->serialized_data(), *cntl, content_type, compress_type,
+ checksum_type, &echo_request));
ASSERT_EQ(EXP_REQUEST, echo_request.message());
ASSERT_EQ(EXP_REQUEST, cntl->request_attachment().to_string());
@@ -1727,10 +1732,12 @@ public:
cntl->set_response_content_type(content_type);
cntl->set_response_compress_type(compress_type);
+ cntl->set_response_checksum_type(checksum_type);
cntl->response_attachment().append(EXP_RESPONSE);
echo_response.set_message(EXP_RESPONSE);
ASSERT_TRUE(brpc::policy::SerializeRpcMessage(
- echo_response, *cntl, content_type, compress_type,
&response->serialized_data()));
+ echo_response, *cntl, content_type, compress_type, checksum_type,
+ &response->serialized_data()));
}
private:
int _content_type_index = brpc::ContentType_MIN;
@@ -1777,11 +1784,12 @@ TEST_F(ServerTest, baidu_master_service) {
ASSERT_EQ(0, server.Join());
}
-void TestGenericCall(brpc::Channel& channel,
- brpc::ContentType content_type,
- brpc::CompressType compress_type) {
+void TestGenericCall(brpc::Channel& channel, brpc::ContentType content_type,
+ brpc::CompressType compress_type,
+ brpc::ChecksumType checksum_type) {
LOG(INFO) << "TestGenericCall: content_type=" << content_type
- << ", compress_type=" << compress_type;
+ << ", compress_type=" << compress_type
+ << ", checksum_type=" << checksum_type;
test::EchoRequest request;
test::EchoResponse response;
request.set_message(EXP_REQUEST);
@@ -1792,11 +1800,13 @@ void TestGenericCall(brpc::Channel& channel,
brpc::Controller cntl;
cntl.set_request_content_type(content_type);
cntl.set_request_compress_type(compress_type);
+ cntl.set_request_checksum_type(checksum_type);
cntl.request_attachment().append(EXP_REQUEST);
std::string error;
ASSERT_TRUE(brpc::policy::SerializeRpcMessage(
- request, cntl, content_type, compress_type,
&serialized_request.serialized_data()));
+ request, cntl, content_type, compress_type, checksum_type,
+ &serialized_request.serialized_data()));
auto sampled_request = new (std::nothrow) brpc::SampledRequest();
sampled_request->meta.set_service_name(
test::EchoService::descriptor()->full_name());
@@ -1807,9 +1817,10 @@ void TestGenericCall(brpc::Channel& channel,
channel.CallMethod(NULL, &cntl, &serialized_request, &serialized_response,
NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
-
ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(serialized_response.serialized_data(),
- cntl,
cntl.response_content_type(),
-
cntl.response_compress_type(), &response));
+ ASSERT_TRUE(brpc::policy::DeserializeRpcMessage(
+ serialized_response.serialized_data(), cntl,
+ cntl.response_content_type(), cntl.response_compress_type(),
+ cntl.response_checksum_type(), &response));
ASSERT_EQ(EXP_RESPONSE, response.message());
ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string());
}
@@ -1829,25 +1840,41 @@ TEST_F(ServerTest, generic_call) {
channel_options.protocol = "baidu_std";
ASSERT_EQ(0, channel.Init(ep, &channel_options));
- TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_ZLIB);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_GZIP);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PB,
brpc::COMPRESS_TYPE_SNAPPY);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_NONE);
-
- TestGenericCall(channel, brpc::CONTENT_TYPE_JSON,
brpc::COMPRESS_TYPE_ZLIB);
- TestGenericCall(channel, brpc::CONTENT_TYPE_JSON,
brpc::COMPRESS_TYPE_GZIP);
- TestGenericCall(channel, brpc::CONTENT_TYPE_JSON,
brpc::COMPRESS_TYPE_SNAPPY);
- TestGenericCall(channel, brpc::CONTENT_TYPE_JSON,
brpc::COMPRESS_TYPE_NONE);
-
- TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
brpc::COMPRESS_TYPE_ZLIB);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
brpc::COMPRESS_TYPE_GZIP);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
brpc::COMPRESS_TYPE_SNAPPY);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
brpc::COMPRESS_TYPE_NONE);
-
- TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
brpc::COMPRESS_TYPE_ZLIB);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
brpc::COMPRESS_TYPE_GZIP);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
brpc::COMPRESS_TYPE_SNAPPY);
- TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
brpc::COMPRESS_TYPE_NONE);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_ZLIB,
+ brpc::CHECKSUM_TYPE_CRC32C);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_GZIP,
+ brpc::CHECKSUM_TYPE_CRC32C);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_SNAPPY,
+ brpc::CHECKSUM_TYPE_NONE);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PB, brpc::COMPRESS_TYPE_NONE,
+ brpc::CHECKSUM_TYPE_NONE);
+
+ TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_ZLIB,
+ brpc::CHECKSUM_TYPE_CRC32C);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_GZIP,
+ brpc::CHECKSUM_TYPE_CRC32C);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_JSON,
+ brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_JSON, brpc::COMPRESS_TYPE_NONE,
+ brpc::CHECKSUM_TYPE_NONE);
+
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
+ brpc::COMPRESS_TYPE_ZLIB, brpc::CHECKSUM_TYPE_CRC32C);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
+ brpc::COMPRESS_TYPE_GZIP, brpc::CHECKSUM_TYPE_CRC32C);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
+ brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_JSON,
+ brpc::COMPRESS_TYPE_NONE, brpc::CHECKSUM_TYPE_NONE);
+
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
+ brpc::COMPRESS_TYPE_ZLIB, brpc::CHECKSUM_TYPE_CRC32C);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
+ brpc::COMPRESS_TYPE_GZIP, brpc::CHECKSUM_TYPE_CRC32C);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
+ brpc::COMPRESS_TYPE_SNAPPY, brpc::CHECKSUM_TYPE_NONE);
+ TestGenericCall(channel, brpc::CONTENT_TYPE_PROTO_TEXT,
+ brpc::COMPRESS_TYPE_NONE, brpc::CHECKSUM_TYPE_NONE);
ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]