This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new c7add74f Support ProtoJson formatted http body (#2921)
c7add74f is described below
commit c7add74fab5c65d80f8d126fe0a51190ca070fac
Author: Bright Chen <[email protected]>
AuthorDate: Sun Mar 30 15:02:26 2025 +0800
Support ProtoJson formatted http body (#2921)
---
src/brpc/extension.h | 2 +-
src/brpc/policy/http_rpc_protocol.cpp | 171 ++++++++++++++++++---------
src/brpc/policy/http_rpc_protocol.h | 1 +
src/bthread/key.cpp | 5 +-
src/butil/logging.cc | 2 +-
src/butil/memory/singleton_on_pthread_once.h | 10 +-
src/json2pb/json_to_pb.cpp | 48 ++++++--
src/json2pb/json_to_pb.h | 29 +++--
src/json2pb/pb_to_json.cpp | 41 ++++++-
src/json2pb/pb_to_json.h | 15 ++-
src/json2pb/protobuf_type_resolver.cpp | 36 ++++++
src/json2pb/protobuf_type_resolver.h | 71 +++++++++++
test/addressbook_map.proto | 10 ++
test/brpc_http_rpc_protocol_unittest.cpp | 74 +++++++++++-
test/brpc_protobuf_json_unittest.cpp | 103 ++++++++++++++++
test/bthread_rwlock_unittest.cpp | 10 +-
16 files changed, 536 insertions(+), 92 deletions(-)
diff --git a/src/brpc/extension.h b/src/brpc/extension.h
index 7190ac91..7854362e 100644
--- a/src/brpc/extension.h
+++ b/src/brpc/extension.h
@@ -48,7 +48,7 @@ public:
void List(std::ostream& os, char separator);
private:
-friend class butil::GetLeakySingleton<Extension<T> >;
+template <typename U> friend U* butil::create_leaky_singleton_obj();
Extension() = default;
butil::CaseIgnoredFlatMap<T*> _instance_map;
butil::Mutex _map_mutex;
diff --git a/src/brpc/policy/http_rpc_protocol.cpp
b/src/brpc/policy/http_rpc_protocol.cpp
index a1353b14..e0ff1e31 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -19,27 +19,26 @@
#include <google/protobuf/descriptor.h> // MethodDescriptor
#include <google/protobuf/text_format.h>
#include <gflags/gflags.h>
-#include <json2pb/pb_to_json.h> // ProtoMessageToJson
-#include <json2pb/json_to_pb.h> // JsonToProtoMessage
#include <string>
-
#include "brpc/policy/http_rpc_protocol.h"
#include "butil/unique_ptr.h" // std::unique_ptr
#include "butil/string_splitter.h" // StringMultiSplitter
#include "butil/string_printf.h"
#include "butil/time.h"
#include "butil/sys_byteorder.h"
+#include "json2pb/pb_to_json.h" // ProtoMessageToJson
+#include "json2pb/json_to_pb.h" // JsonToProtoMessage
#include "brpc/compress.h"
-#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD
-#include "brpc/controller.h" // Controller
-#include "brpc/server.h" // Server
+#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD
+#include "brpc/controller.h" // Controller
+#include "brpc/server.h" // Server
#include "brpc/details/server_private_accessor.h"
#include "brpc/span.h"
-#include "brpc/socket.h" // Socket
-#include "brpc/rpc_dump.h" // SampledRequest
-#include "brpc/http_status_code.h" // HTTP_STATUS_*
+#include "brpc/socket.h" // Socket
+#include "brpc/rpc_dump.h" // SampledRequest
+#include "brpc/http_status_code.h" // HTTP_STATUS_*
#include "brpc/details/controller_private_accessor.h"
-#include "brpc/builtin/index_service.h" // IndexService
+#include "brpc/builtin/index_service.h" // IndexService
#include "brpc/policy/gzip_compress.h"
#include "brpc/policy/http2_rpc_protocol.h"
#include "brpc/details/usercode_backup_pool.h"
@@ -203,6 +202,9 @@ HttpContentType ParseContentType(butil::StringPiece ct,
bool* is_grpc_ct) {
if (ct.starts_with("json")) {
type = HTTP_CONTENT_JSON;
ct.remove_prefix(4);
+ } else if (ct.starts_with("proto-json")) {
+ type = HTTP_CONTENT_PROTO_JSON;
+ ct.remove_prefix(10);
} else if (ct.starts_with("proto-text")) {
type = HTTP_CONTENT_PROTO_TEXT;
ct.remove_prefix(10);
@@ -271,6 +273,79 @@ static bool RemoveGrpcPrefix(butil::IOBuf* body, bool*
compressed) {
return (message_length + 5 == sz);
}
+static bool JsonToProtoMessage(const butil::IOBuf& body,
+ google::protobuf::Message* message,
+ Controller* cntl, int error_code) {
+ butil::IOBufAsZeroCopyInputStream wrapper(body);
+ json2pb::Json2PbOptions options;
+ options.base64_to_bytes = cntl->has_pb_bytes_to_base64();
+ options.array_to_single_repeated = cntl->has_pb_single_repeated_to_array();
+ std::string error;
+ bool ok = json2pb::JsonToProtoMessage(&wrapper, message, options, &error);
+ if (!ok) {
+ cntl->SetFailed(error_code, "Fail to parse http json body as %s: %s",
+ message->GetDescriptor()->full_name().c_str(),
+ error.c_str());
+ }
+ return ok;
+}
+
+static bool ProtoMessageToJson(const google::protobuf::Message& message,
+ butil::IOBufAsZeroCopyOutputStream* wrapper,
+ Controller* cntl, int error_code) {
+ json2pb::Pb2JsonOptions options;
+ options.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
+ options.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
+ options.always_print_primitive_fields =
cntl->has_always_print_primitive_fields();
+ options.single_repeated_to_array = cntl->has_pb_single_repeated_to_array();
+ options.enum_option = FLAGS_pb_enum_as_number
+ ? json2pb::OUTPUT_ENUM_BY_NUMBER
+ : json2pb::OUTPUT_ENUM_BY_NAME;
+ std::string error;
+ bool ok = json2pb::ProtoMessageToJson(message, wrapper, options, &error);
+ if (!ok) {
+ cntl->SetFailed(error_code, "Fail to convert %s to json: %s",
+ message.GetDescriptor()->full_name().c_str(),
+ error.c_str());
+ }
+ return ok;
+}
+
+static bool ProtoJsonToProtoMessage(const butil::IOBuf& body,
+ google::protobuf::Message* message,
+ Controller* cntl, int error_code) {
+ json2pb::ProtoJson2PbOptions options;
+ options.ignore_unknown_fields = true;
+ butil::IOBufAsZeroCopyInputStream wrapper(body);
+ std::string error;
+ bool ok = json2pb::ProtoJsonToProtoMessage(&wrapper, message, options,
&error);
+ if (!ok) {
+ cntl->SetFailed(error_code, "Fail to parse http proto-json body as %s:
%s",
+ message->GetDescriptor()->full_name().c_str(),
+ error.c_str());
+ }
+ return ok;
+}
+
+static bool ProtoMessageToProtoJson(const google::protobuf::Message& message,
+ butil::IOBufAsZeroCopyOutputStream*
wrapper,
+ Controller* cntl, int error_code) {
+ json2pb::Pb2ProtoJsonOptions options;
+#if GOOGLE_PROTOBUF_VERSION >= 5026002
+ options.always_print_fields_with_no_presence =
cntl->has_always_print_primitive_fields();
+#else
+ options.always_print_primitive_fields =
cntl->has_always_print_primitive_fields();
+#endif
+ options.always_print_enums_as_ints = FLAGS_pb_enum_as_number;
+ std::string error;
+ bool ok = json2pb::ProtoMessageToProtoJson(message, wrapper, options,
&error);
+ if (!ok) {
+ cntl->SetFailed(error_code, "Fail to convert %s to proto-json: %s",
+ message.GetDescriptor()->full_name().c_str(),
error.c_str());
+ }
+ return ok;
+}
+
void ProcessHttpResponse(InputMessageBase* msg) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
@@ -435,8 +510,8 @@ void ProcessHttpResponse(InputMessageBase* msg) {
if (grpc_compressed) {
encoding = res_header->GetHeader(common->GRPC_ENCODING);
if (encoding == NULL) {
- cntl->SetFailed(ERESPONSE, "Fail to find header
`grpc-encoding'"
- " in compressed gRPC response");
+ cntl->SetFailed(ERESPONSE, "Fail to find header
`grpc-encoding' "
+ "in compressed gRPC response");
break;
}
}
@@ -455,23 +530,24 @@ void ProcessHttpResponse(InputMessageBase* msg) {
}
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
- cntl->SetFailed(ERESPONSE, "Fail to parse content");
+ cntl->SetFailed(ERESPONSE, "Fail to parse content as %s",
+
cntl->response()->GetDescriptor()->full_name().c_str());
break;
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!ParsePbTextFromIOBuf(cntl->response(), res_body)) {
- cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content");
+ cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content
as %s",
+
cntl->response()->GetDescriptor()->full_name().c_str());
break;
}
} else if (content_type == HTTP_CONTENT_JSON) {
- // message body is json
- butil::IOBufAsZeroCopyInputStream wrapper(res_body);
- std::string err;
- json2pb::Json2PbOptions options;
- options.base64_to_bytes = cntl->has_pb_bytes_to_base64();
- options.array_to_single_repeated =
cntl->has_pb_single_repeated_to_array();
- if (!json2pb::JsonToProtoMessage(&wrapper, cntl->response(),
options, &err)) {
- cntl->SetFailed(ERESPONSE, "Fail to parse content, %s",
err.c_str());
+ // Message body is json.
+ if (!JsonToProtoMessage(res_body, cntl->response(), cntl,
ERESPONSE)) {
+ break;
+ }
+ } else if (content_type == HTTP_CONTENT_PROTO_JSON) {
+ // Message body is json.
+ if (!ProtoJsonToProtoMessage(res_body, cntl->response(), cntl,
ERESPONSE)) {
break;
}
} else {
@@ -530,8 +606,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
}
} else {
bool is_grpc_ct = false;
- content_type = ParseContentType(hreq.content_type(),
- &is_grpc_ct);
+ content_type = ParseContentType(hreq.content_type(), &is_grpc_ct);
is_grpc = (is_http2 && is_grpc_ct);
}
@@ -549,21 +624,15 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
return cntl->SetFailed(EREQUEST, "Fail to print %s as
proto-text",
pbreq->GetTypeName().c_str());
}
+ } else if (content_type == HTTP_CONTENT_PROTO_JSON) {
+ if (!ProtoMessageToProtoJson(*pbreq, &wrapper, cntl, EREQUEST)) {
+ cntl->request_attachment().clear();
+ return;
+ }
} else if (content_type == HTTP_CONTENT_JSON) {
- std::string err;
- json2pb::Pb2JsonOptions opt;
- opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
- opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
- opt.always_print_primitive_fields =
cntl->has_always_print_primitive_fields();
- opt.single_repeated_to_array =
cntl->has_pb_single_repeated_to_array();
-
- opt.enum_option = (FLAGS_pb_enum_as_number
- ? json2pb::OUTPUT_ENUM_BY_NUMBER
- : json2pb::OUTPUT_ENUM_BY_NAME);
- if (!json2pb::ProtoMessageToJson(*pbreq, &wrapper, opt, &err)) {
+ if (!ProtoMessageToJson(*pbreq, &wrapper, cntl, EREQUEST)) {
cntl->request_attachment().clear();
- return cntl->SetFailed(
- EREQUEST, "Fail to convert request to json, %s",
err.c_str());
+ return;
}
} else {
return cntl->SetFailed(
@@ -819,19 +888,10 @@ HttpResponseSender::~HttpResponseSender() {
if (!google::protobuf::TextFormat::Print(*res, &wrapper)) {
cntl->SetFailed(ERESPONSE, "Fail to print %s as proto-text",
res->GetTypeName().c_str());
}
+ } else if (content_type == HTTP_CONTENT_PROTO_JSON) {
+ ProtoMessageToProtoJson(*res, &wrapper, cntl, ERESPONSE);
} else {
- std::string err;
- json2pb::Pb2JsonOptions opt;
- opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
- opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
- opt.always_print_primitive_fields =
cntl->has_always_print_primitive_fields();
- opt.single_repeated_to_array =
cntl->has_pb_single_repeated_to_array();
- opt.enum_option = (FLAGS_pb_enum_as_number
- ? json2pb::OUTPUT_ENUM_BY_NUMBER
- : json2pb::OUTPUT_ENUM_BY_NAME);
- if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
- cntl->SetFailed(ERESPONSE, "Fail to convert response to json,
%s", err.c_str());
- }
+ ProtoMessageToJson(*res, &wrapper, cntl, ERESPONSE);
}
}
@@ -1610,17 +1670,14 @@ void ProcessHttpRequest(InputMessageBase *msg) {
req->GetDescriptor()->full_name().c_str());
return;
}
+ } else if (content_type == HTTP_CONTENT_PROTO_JSON) {
+ if (!ProtoJsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
+ return;
+ }
} else {
- butil::IOBufAsZeroCopyInputStream wrapper(req_body);
- std::string err;
- json2pb::Json2PbOptions options;
- options.base64_to_bytes = mp->params.pb_bytes_to_base64;
- options.array_to_single_repeated =
mp->params.pb_single_repeated_to_array;
cntl->set_pb_bytes_to_base64(mp->params.pb_bytes_to_base64);
cntl->set_pb_single_repeated_to_array(mp->params.pb_single_repeated_to_array);
- if (!json2pb::JsonToProtoMessage(&wrapper, req, options,
&err)) {
- cntl->SetFailed(EREQUEST, "Fail to parse http body as %s,
%s",
- req->GetDescriptor()->full_name().c_str(),
err.c_str());
+ if (!JsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
return;
}
}
diff --git a/src/brpc/policy/http_rpc_protocol.h
b/src/brpc/policy/http_rpc_protocol.h
index 918e69d0..bc8bd065 100644
--- a/src/brpc/policy/http_rpc_protocol.h
+++ b/src/brpc/policy/http_rpc_protocol.h
@@ -149,6 +149,7 @@ enum HttpContentType {
HTTP_CONTENT_JSON = 1,
HTTP_CONTENT_PROTO = 2,
HTTP_CONTENT_PROTO_TEXT = 3,
+ HTTP_CONTENT_PROTO_JSON = 4,
};
// Parse from the textual content type. One type may have more than one
literals.
diff --git a/src/bthread/key.cpp b/src/bthread/key.cpp
index 1bf5ec89..00215d7f 100644
--- a/src/bthread/key.cpp
+++ b/src/bthread/key.cpp
@@ -222,8 +222,7 @@ private:
class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
public:
KeyTableList() :
- _head(NULL), _tail(NULL), _length(0) {
- }
+ _head(NULL), _tail(NULL), _length(0) {}
~KeyTableList() {
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
@@ -305,7 +304,7 @@ public:
return count;
}
- inline uint32_t get_length() {
+ inline uint32_t get_length() const {
return _length;
}
diff --git a/src/butil/logging.cc b/src/butil/logging.cc
index 9b09af6d..2f759f6a 100644
--- a/src/butil/logging.cc
+++ b/src/butil/logging.cc
@@ -395,7 +395,7 @@ bool InitializeLogFileHandle() {
#elif defined(OS_POSIX)
log_file = fopen(log_file_name->c_str(), "a");
if (log_file == NULL) {
- fprintf(stderr, "Fail to fopen %s", log_file_name->c_str());
+ fprintf(stderr, "Fail to fopen %s: %s", log_file_name->c_str(),
berror());
return false;
}
#endif
diff --git a/src/butil/memory/singleton_on_pthread_once.h
b/src/butil/memory/singleton_on_pthread_once.h
index 378d708b..9699bba7 100644
--- a/src/butil/memory/singleton_on_pthread_once.h
+++ b/src/butil/memory/singleton_on_pthread_once.h
@@ -25,7 +25,13 @@
namespace butil {
-template <typename T> class GetLeakySingleton {
+template <typename T>
+T* create_leaky_singleton_obj() {
+ return new T();
+}
+
+template <typename T>
+class GetLeakySingleton {
public:
static butil::subtle::AtomicWord g_leaky_singleton_untyped;
static pthread_once_t g_create_leaky_singleton_once;
@@ -39,7 +45,7 @@ pthread_once_t
GetLeakySingleton<T>::g_create_leaky_singleton_once = PTHREAD_ONC
template <typename T>
void GetLeakySingleton<T>::create_leaky_singleton() {
- T* obj = new T;
+ T* obj = create_leaky_singleton_obj<T>();
butil::subtle::Release_Store(
&g_leaky_singleton_untyped,
reinterpret_cast<butil::subtle::AtomicWord>(obj));
diff --git a/src/json2pb/json_to_pb.cpp b/src/json2pb/json_to_pb.cpp
index f942253e..53887a38 100644
--- a/src/json2pb/json_to_pb.cpp
+++ b/src/json2pb/json_to_pb.cpp
@@ -27,14 +27,14 @@
#include "butil/strings/string_number_conversions.h"
#include "butil/third_party/rapidjson/error/error.h"
#include "butil/third_party/rapidjson/rapidjson.h"
-#include "json_to_pb.h"
-#include "zero_copy_stream_reader.h" // ZeroCopyStreamReader
-#include "encode_decode.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/zero_copy_stream_reader.h" // ZeroCopyStreamReader
+#include "json2pb/encode_decode.h"
+#include "json2pb/protobuf_map.h"
+#include "json2pb/rapidjson.h"
+#include "json2pb/protobuf_type_resolver.h"
#include "butil/base64.h"
-#include "butil/string_printf.h"
-#include "protobuf_map.h"
-#include "rapidjson.h"
-
+#include "butil/iobuf.h"
#ifdef __GNUC__
// Ignore -Wnonnull for `(::google::protobuf::Message*)nullptr' of J2PERROR by
design.
@@ -712,6 +712,40 @@ bool
JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream *stream,
std::string* error) {
return JsonToProtoMessage(stream, message, Json2PbOptions(), error,
nullptr);
}
+
+bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json,
+ google::protobuf::Message* message,
+ const ProtoJson2PbOptions& options,
+ std::string* error) {
+ TypeResolverUniqueptr type_resolver = GetTypeResolver(*message);
+ butil::IOBuf buf;
+ butil::IOBufAsZeroCopyOutputStream output_stream(&buf);
+ std::string type_url = GetTypeUrl(*message);
+ auto st = google::protobuf::util::JsonToBinaryStream(
+ type_resolver.get(), type_url, json, &output_stream, options);
+
+ butil::IOBufAsZeroCopyInputStream input_stream(buf);
+ google::protobuf::io::CodedInputStream decoder(&input_stream);
+ if (!st.ok()) {
+ if (NULL != error) {
+ *error = st.ToString();
+ }
+ return false;
+ }
+
+ bool ok = message->ParseFromCodedStream(&decoder);
+ if (!ok && NULL != error) {
+ *error = "Fail to ParseFromCodedStream";
+ }
+ return ok;
+}
+
+bool ProtoJsonToProtoMessage(const std::string& json,
google::protobuf::Message* message,
+ const ProtoJson2PbOptions& options, std::string*
error) {
+ google::protobuf::io::ArrayInputStream input_stream(json.data(),
json.size());
+ return ProtoJsonToProtoMessage(&input_stream, message, options, error);
+}
+
} //namespace json2pb
#undef J2PERROR
diff --git a/src/json2pb/json_to_pb.h b/src/json2pb/json_to_pb.h
index 44203e08..78eb15b6 100644
--- a/src/json2pb/json_to_pb.h
+++ b/src/json2pb/json_to_pb.h
@@ -23,6 +23,7 @@
#include "json2pb/zero_copy_stream_reader.h"
#include <google/protobuf/message.h>
#include <google/protobuf/io/zero_copy_stream.h> // ZeroCopyInputStream
+#include <google/protobuf/util/json_util.h>
namespace json2pb {
@@ -43,7 +44,7 @@ struct Json2PbOptions {
bool allow_remaining_bytes_after_parsing;
};
-// Convert `json' to protobuf `message'.
+// Convert `json' to protobuf `message' according to `options'.
// Returns true on success. `error' (if not NULL) will be set with error
// message on failure.
//
@@ -58,18 +59,18 @@ bool JsonToProtoMessage(const std::string& json,
size_t* parsed_offset = nullptr);
// Use ZeroCopyInputStream as input instead of std::string.
-bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream *json,
- google::protobuf::Message *message,
- const Json2PbOptions &options,
- std::string *error = nullptr,
- size_t *parsed_offset = nullptr);
+bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json,
+ google::protobuf::Message* message,
+ const Json2PbOptions& options,
+ std::string* error = nullptr,
+ size_t* parsed_offset = nullptr);
// Use ZeroCopyStreamReader as input instead of std::string.
// If you need to parse multiple jsons from IOBuf, you should use this
// overload instead of the ZeroCopyInputStream one which bases on this
// and recreates a ZeroCopyStreamReader internally that can't be reused
// between continuous calls.
-bool JsonToProtoMessage(ZeroCopyStreamReader *json,
+bool JsonToProtoMessage(ZeroCopyStreamReader* json,
google::protobuf::Message* message,
const Json2PbOptions& options,
std::string* error = nullptr,
@@ -83,6 +84,20 @@ bool JsonToProtoMessage(const std::string& json,
bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* stream,
google::protobuf::Message* message,
std::string* error = nullptr);
+
+// See <google/protobuf/util/json_util.h> for details.
+using ProtoJson2PbOptions = google::protobuf::util::JsonParseOptions;
+
+// Convert ProtoJSON formatted `json' to protobuf `message' according to
`options'.
+// See https://protobuf.dev/programming-guides/json/ for details.
+bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json,
+ google::protobuf::Message* message,
+ const ProtoJson2PbOptions& options,
+ std::string* error = NULL);
+// Use default GoogleJson2PbOptions.
+bool ProtoJsonToProtoMessage(const std::string& json,
google::protobuf::Message* message,
+ const ProtoJson2PbOptions& options, std::string*
error = NULL);
+
} // namespace json2pb
#endif // BRPC_JSON2PB_JSON_TO_PB_H
diff --git a/src/json2pb/pb_to_json.cpp b/src/json2pb/pb_to_json.cpp
index b0066dc6..0dc94814 100644
--- a/src/json2pb/pb_to_json.cpp
+++ b/src/json2pb/pb_to_json.cpp
@@ -22,12 +22,14 @@
#include <sys/time.h>
#include <time.h>
#include <google/protobuf/descriptor.h>
+#include "json2pb/zero_copy_stream_writer.h"
+#include "json2pb/encode_decode.h"
+#include "json2pb/protobuf_map.h"
+#include "json2pb/rapidjson.h"
+#include "json2pb/pb_to_json.h"
+#include "json2pb/protobuf_type_resolver.h"
+#include "butil/iobuf.h"
#include "butil/base64.h"
-#include "zero_copy_stream_writer.h"
-#include "encode_decode.h"
-#include "protobuf_map.h"
-#include "rapidjson.h"
-#include "pb_to_json.h"
namespace json2pb {
Pb2JsonOptions::Pb2JsonOptions()
@@ -345,4 +347,33 @@ bool ProtoMessageToJson(const google::protobuf::Message&
message,
std::string* error) {
return ProtoMessageToJson(message, stream, Pb2JsonOptions(), error);
}
+
+bool ProtoMessageToProtoJson(const google::protobuf::Message& message,
+ google::protobuf::io::ZeroCopyOutputStream* json,
+ const Pb2ProtoJsonOptions& options, std::string*
error) {
+ TypeResolverUniqueptr type_resolver = GetTypeResolver(message);
+ butil::IOBuf buf;
+ butil::IOBufAsZeroCopyOutputStream output_stream(&buf);
+ google::protobuf::io::CodedOutputStream coded_stream(&output_stream);
+ if (!message.SerializeToCodedStream(&coded_stream)) {
+ return false;
+ }
+
+ butil::IOBufAsZeroCopyInputStream input_stream(buf);
+ auto st = google::protobuf::util::BinaryToJsonStream(
+ type_resolver.get(), GetTypeUrl(message), &input_stream, json,
options);
+
+ bool ok = st.ok();
+ if (!ok && NULL != error) {
+ *error = st.ToString();
+ }
+ return ok;
+}
+
+bool ProtoMessageToProtoJson(const google::protobuf::Message& message,
std::string* json,
+ const Pb2ProtoJsonOptions& options, std::string*
error) {
+ google::protobuf::io::StringOutputStream output_stream(json);
+ return ProtoMessageToProtoJson(message, &output_stream, options, error);
+}
+
} // namespace json2pb
diff --git a/src/json2pb/pb_to_json.h b/src/json2pb/pb_to_json.h
index 97057d0f..33311ffb 100644
--- a/src/json2pb/pb_to_json.h
+++ b/src/json2pb/pb_to_json.h
@@ -23,6 +23,7 @@
#include <string>
#include <google/protobuf/message.h>
#include <google/protobuf/io/zero_copy_stream.h> // ZeroCopyOutputStream
+#include <google/protobuf/util/json_util.h>
namespace json2pb {
@@ -79,7 +80,7 @@ bool ProtoMessageToJson(const google::protobuf::Message&
message,
std::string* error = NULL);
// send output to ZeroCopyOutputStream instead of std::string.
bool ProtoMessageToJson(const google::protobuf::Message& message,
- google::protobuf::io::ZeroCopyOutputStream *json,
+ google::protobuf::io::ZeroCopyOutputStream* json,
const Pb2JsonOptions& options,
std::string* error = NULL);
@@ -90,6 +91,18 @@ bool ProtoMessageToJson(const google::protobuf::Message&
message,
bool ProtoMessageToJson(const google::protobuf::Message& message,
google::protobuf::io::ZeroCopyOutputStream* json,
std::string* error = NULL);
+
+// See <google/protobuf/util/json_util.h> for details.
+using Pb2ProtoJsonOptions = google::protobuf::util::JsonOptions;
+
+// Convert protobuf `messge' to `json' in ProtoJSON format according to
`options'.
+// See https://protobuf.dev/programming-guides/json/ for details.
+bool ProtoMessageToProtoJson(const google::protobuf::Message& message,
+ google::protobuf::io::ZeroCopyOutputStream* json,
+ const Pb2ProtoJsonOptions& options, std::string*
error = NULL);
+// Using default GooglePb2JsonOptions.
+bool ProtoMessageToProtoJson(const google::protobuf::Message& message,
std::string* json,
+ const Pb2ProtoJsonOptions& options, std::string*
error = NULL);
} // namespace json2pb
#endif // BRPC_JSON2PB_PB_TO_JSON_H
diff --git a/src/json2pb/protobuf_type_resolver.cpp
b/src/json2pb/protobuf_type_resolver.cpp
new file mode 100644
index 00000000..a75974fc
--- /dev/null
+++ b/src/json2pb/protobuf_type_resolver.cpp
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "json2pb/protobuf_type_resolver.h"
+
+namespace json2pb {
+
+using google::protobuf::DescriptorPool;
+using google::protobuf::util::TypeResolver;
+using google::protobuf::util::NewTypeResolverForDescriptorPool;
+
+TypeResolverUniqueptr GetTypeResolver(const google::protobuf::Message&
message) {
+ auto pool = message.GetDescriptor()->file()->pool();
+ bool is_generated_pool = pool == DescriptorPool::generated_pool();
+ TypeResolver* resolver = is_generated_pool
+ ? butil::get_leaky_singleton<TypeResolver>()
+ : NewTypeResolverForDescriptorPool(PROTOBUF_TYPE_URL_PREFIX, pool);
+ return { resolver, TypeResolverDeleter(is_generated_pool) };
+}
+
+} // namespace json2pb
+
diff --git a/src/json2pb/protobuf_type_resolver.h
b/src/json2pb/protobuf_type_resolver.h
new file mode 100644
index 00000000..18993f18
--- /dev/null
+++ b/src/json2pb/protobuf_type_resolver.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_PROTOBUF_TYPE_RESOLVER_H
+#define BRPC_PROTOBUF_TYPE_RESOLVER_H
+
+#include <string>
+#include <memory>
+#include <google/protobuf/message.h>
+#include <google/protobuf/util/type_resolver.h>
+#include <google/protobuf/util/type_resolver_util.h>
+#include "butil/string_printf.h"
+#include "butil/memory/singleton_on_pthread_once.h"
+
+namespace json2pb {
+
+#define PROTOBUF_TYPE_URL_PREFIX "type.googleapis.com"
+
+inline std::string GetTypeUrl(const google::protobuf::Message& message) {
+ return butil::string_printf(PROTOBUF_TYPE_URL_PREFIX"/%s",
+ message.GetDescriptor()->full_name().c_str());
+}
+
+class TypeResolverDeleter {
+public:
+ explicit TypeResolverDeleter(bool is_generated_pool)
+ : _is_generated_pool(is_generated_pool) {}
+
+ void operator()(google::protobuf::util::TypeResolver* resolver) const {
+ if (!_is_generated_pool) {
+ delete resolver;
+ }
+ }
+private:
+ bool _is_generated_pool;
+};
+
+using TypeResolverUniqueptr = std::unique_ptr<
+ google::protobuf::util::TypeResolver, TypeResolverDeleter>;
+
+TypeResolverUniqueptr GetTypeResolver(const google::protobuf::Message&
message);
+
+} // namespace json2pb
+
+namespace butil {
+
+// Customized singleton object creation for
google::protobuf::util::TypeResolver.
+template<>
+inline google::protobuf::util::TypeResolver*
+create_leaky_singleton_obj<google::protobuf::util::TypeResolver>() {
+ return google::protobuf::util::NewTypeResolverForDescriptorPool(
+ PROTOBUF_TYPE_URL_PREFIX,
google::protobuf::DescriptorPool::generated_pool());
+}
+
+} // namespace butil
+
+#endif // BRPC_PROTOBUF_TYPE_RESOLVER_H
diff --git a/test/addressbook_map.proto b/test/addressbook_map.proto
index d3b15466..4d18e68c 100644
--- a/test/addressbook_map.proto
+++ b/test/addressbook_map.proto
@@ -52,6 +52,16 @@ message AddressComplex {
repeated FriendEntry friends = 2;
}
+message AddressIntMapStd {
+ required string addr = 1;
+ map<string, int32> numbers = 2;
+}
+
+message AddressStringMapStd {
+ required string addr = 1;
+ map<string, string> contacts = 2;
+}
+
message haha {
repeated int32 a = 1;
}
\ No newline at end of file
diff --git a/test/brpc_http_rpc_protocol_unittest.cpp
b/test/brpc_http_rpc_protocol_unittest.cpp
index ad631a2e..578e8f89 100644
--- a/test/brpc_http_rpc_protocol_unittest.cpp
+++ b/test/brpc_http_rpc_protocol_unittest.cpp
@@ -189,6 +189,20 @@ protected:
return msg;
}
+ brpc::policy::HttpContext* MakePostJsonStdRequestMessage(const
std::string& path) {
+ brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
+ msg->header().uri().set_path(path);
+ msg->header().set_content_type("application/proto-json");
+ msg->header().set_method(brpc::HTTP_METHOD_POST);
+
+ test::EchoRequest req;
+ req.set_message(EXP_REQUEST);
+ butil::IOBufAsZeroCopyOutputStream req_stream(&msg->body());
+ json2pb::Pb2ProtoJsonOptions options;
+ EXPECT_TRUE(json2pb::ProtoMessageToProtoJson(req, &req_stream,
options));
+ return msg;
+ }
+
brpc::policy::HttpContext* MakePostProtoTextRequestMessage(
const std::string& path) {
brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
@@ -334,7 +348,7 @@ TEST_F(HttpTest, parse_http_address) {
TEST_F(HttpTest, verify_request) {
{
brpc::policy::HttpContext* msg =
- MakePostRequestMessage("/EchoService/Echo");
+ MakePostRequestMessage("/EchoService/Echo");
VerifyMessage(msg, false);
msg->Destroy();
}
@@ -350,6 +364,12 @@ TEST_F(HttpTest, verify_request) {
VerifyMessage(msg, false);
msg->Destroy();
}
+ {
+ brpc::policy::HttpContext* msg =
+ MakePostJsonStdRequestMessage("/EchoService/Echo");
+ VerifyMessage(msg, false);
+ msg->Destroy();
+ }
{
brpc::policy::HttpContext* msg =
MakePostProtoTextRequestMessage("/EchoService/Echo");
@@ -1677,7 +1697,6 @@ TEST_F(HttpTest, spring_protobuf_content_type) {
brpc::Controller cntl2;
test::EchoService_Stub stub(&channel);
- req.set_message(EXP_REQUEST);
res.Clear();
cntl2.http_request().set_content_type("application/x-protobuf");
stub.Echo(&cntl2, &req, &res, nullptr);
@@ -1770,7 +1789,7 @@ TEST_F(HttpTest, dump_http_request) {
brpc::g_rpc_dump_sl.sampling_range = 0;
}
-TEST_F(HttpTest, spring_protobuf_text_content_type) {
+TEST_F(HttpTest, proto_text_content_type) {
const int port = 8923;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
@@ -1795,6 +1814,55 @@ TEST_F(HttpTest, spring_protobuf_text_content_type) {
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
cntl.response_attachment().to_string(), &res));
ASSERT_EQ(EXP_RESPONSE, res.message());
+
+ test::EchoService_Stub stub(&channel);
+ cntl.Reset();
+ cntl.http_request().set_content_type("application/proto-text");
+ res.Clear();
+ stub.Echo(&cntl, &req, &res, NULL);
+ ASSERT_FALSE(cntl.Failed());
+ ASSERT_EQ(EXP_RESPONSE, res.message());
+ ASSERT_EQ("application/proto-text", cntl.http_response().content_type());
+}
+
+TEST_F(HttpTest, proto_json_content_type) {
+ const int port = 8923;
+ brpc::Server server;
+ EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
+ EXPECT_EQ(0, server.Start(port, nullptr));
+
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ options.protocol = "http";
+ ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port),
&options));
+
+ brpc::Controller cntl;
+ test::EchoRequest req;
+ test::EchoResponse res;
+ req.set_message(EXP_REQUEST);
+ cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+ cntl.http_request().uri() = "/EchoService/Echo";
+ cntl.http_request().set_content_type("application/proto-json");
+ json2pb::Pb2ProtoJsonOptions json_options;
+ butil::IOBufAsZeroCopyOutputStream
output_stream(&cntl.request_attachment());
+ ASSERT_TRUE(json2pb::ProtoMessageToProtoJson(req, &output_stream,
json_options));
+ channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
+ ASSERT_FALSE(cntl.Failed());
+ ASSERT_EQ("application/proto-json", cntl.http_response().content_type());
+ json2pb::ProtoJson2PbOptions parse_options;
+ parse_options.ignore_unknown_fields = true;
+ butil::IOBufAsZeroCopyInputStream input_stream(cntl.response_attachment());
+ ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(&input_stream, &res,
parse_options));
+ ASSERT_EQ(EXP_RESPONSE, res.message());
+
+ test::EchoService_Stub stub(&channel);
+ cntl.Reset();
+ cntl.http_request().set_content_type("application/proto-json");
+ res.Clear();
+ stub.Echo(&cntl, &req, &res, nullptr);
+ ASSERT_FALSE(cntl.Failed());
+ ASSERT_EQ(EXP_RESPONSE, res.message());
+ ASSERT_EQ("application/proto-json", cntl.http_response().content_type());
}
class HttpServiceImpl : public ::test::HttpService {
diff --git a/test/brpc_protobuf_json_unittest.cpp
b/test/brpc_protobuf_json_unittest.cpp
index 5bb158d7..e8435d2c 100644
--- a/test/brpc_protobuf_json_unittest.cpp
+++ b/test/brpc_protobuf_json_unittest.cpp
@@ -1640,4 +1640,107 @@ TEST_F(ProtobufJsonTest, parse_multiple_json_error) {
ASSERT_EQ(47ul, offset);
}
+TEST_F(ProtobufJsonTest, proto_json_to_pb) {
+ std::string error;
+ json2pb::ProtoJson2PbOptions options;
+
+ std::string json1 = R"({"addr":"baidu.com",)"
+
R"("numbers":[{"key":"tel","value":123456},{"key":"cell","value":654321}]})";
+ AddressIntMapStd aims;
+ ASSERT_FALSE(json2pb::ProtoJsonToProtoMessage(json1, &aims, options,
&error));
+ LOG(INFO) << "Fail to ProtoJsonToProtoMessage: " << error;
+
+ error.clear();
+ butil::IOBuf json_buf1;
+ json_buf1.append(json1);
+ butil::IOBufAsZeroCopyInputStream input_stream1(json_buf1);
+ ASSERT_FALSE(json2pb::ProtoJsonToProtoMessage(&input_stream1, &aims,
options, &error));
+ LOG(INFO) << "Fail to ProtoJsonToProtoMessage: " << error;
+ error.clear();
+
+ std::string json2 = R"({"addr":"baidu.com",)"
+ R"("numbers":{"tel":123456,"cell":654321}})";
+ ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(json2, &aims, options,
&error)) << error;
+ ASSERT_TRUE(aims.has_addr());
+ ASSERT_EQ(aims.addr(), "baidu.com");
+ ASSERT_EQ(aims.numbers_size(), 2);
+ ASSERT_EQ(aims.numbers().at("tel"), 123456);
+ ASSERT_EQ(aims.numbers().at("cell"), 654321);
+
+ aims.Clear();
+ butil::IOBuf json_buf2;
+ json_buf2.append(json2);
+ butil::IOBufAsZeroCopyInputStream input_stream2(json_buf2);
+ ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(&input_stream2, &aims,
options, &error)) << error;
+ ASSERT_TRUE(aims.has_addr());
+ ASSERT_EQ(aims.addr(), "baidu.com");
+ ASSERT_EQ(aims.numbers_size(), 2);
+ ASSERT_EQ(aims.numbers().at("tel"), 123456);
+ ASSERT_EQ(aims.numbers().at("cell"), 654321);
+
+ std::string json3 = R"({"addr":"baidu.com",)"
+
R"("contacts":{"email":"[email protected]","office":"Shanghai"}})";
+ AddressStringMapStd asms;
+ ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(json3, &asms, options,
&error)) << error;
+ ASSERT_TRUE(asms.has_addr());
+ ASSERT_EQ(asms.addr(), "baidu.com");
+ ASSERT_EQ(asms.contacts().size(), 2);
+ ASSERT_EQ(asms.contacts().at("email"), "[email protected]");
+ ASSERT_EQ(asms.contacts().at("office"), "Shanghai");
+
+ asms.Clear();
+ butil::IOBuf json_buf3;
+ json_buf3.append(json3);
+ butil::IOBufAsZeroCopyInputStream input_stream3(json_buf3);
+ ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(&input_stream3, &asms,
options, &error)) << error;
+ ASSERT_TRUE(asms.has_addr());
+ ASSERT_EQ(asms.addr(), "baidu.com");
+ ASSERT_EQ(asms.contacts().size(), 2);
+ ASSERT_EQ(asms.contacts().at("email"), "[email protected]");
+ ASSERT_EQ(asms.contacts().at("office"), "Shanghai");
+}
+
+TEST_F(ProtobufJsonTest, pb_to_proto_json) {
+ std::string error;
+ json2pb::Pb2ProtoJsonOptions options;
+
+ AddressIntMapStd aims;
+ aims.set_addr("baidu.com");
+ (*aims.mutable_numbers())["tel"] = 123456;
+ (*aims.mutable_numbers())["cell"] = 654321;
+ std::string json1;
+ ASSERT_TRUE(json2pb::ProtoMessageToJson(aims, &json1)) << error;
+ ASSERT_NE(json1.find(R"("addr":"baidu.com")"), std::string::npos);
+ ASSERT_NE(json1.find(R"("cell":654321)"), std::string::npos);
+ ASSERT_NE(json1.find(R"("tel":123456)"), std::string::npos);
+
+ butil::IOBuf json_buf1;
+ json_buf1.append(json1);
+ butil::IOBufAsZeroCopyOutputStream output_stream1(&json_buf1);
+ ASSERT_TRUE(json2pb::ProtoMessageToJson(aims, &output_stream1, &error)) <<
error;
+ json1 = json_buf1.to_string();
+ ASSERT_NE(json1.find(R"("addr":"baidu.com")"), std::string::npos);
+ ASSERT_NE(json1.find(R"("cell":654321)"), std::string::npos);
+ ASSERT_NE(json1.find(R"("tel":123456)"), std::string::npos);
+
+ AddressStringMapStd asms;
+ asms.set_addr("baidu.com");
+ (*asms.mutable_contacts())["email"] = "[email protected]";
+ (*asms.mutable_contacts())["office"] = "Shanghai";
+ std::string json2;
+ ASSERT_TRUE(json2pb::ProtoMessageToJson(asms, &json2)) << error;
+ ASSERT_NE(json2.find(R"("addr":"baidu.com")"), std::string::npos);
+ ASSERT_NE(json2.find(R"("email":"[email protected]")"), std::string::npos);
+ ASSERT_NE(json2.find(R"("office":"Shanghai")"), std::string::npos);
+
+ butil::IOBuf json_buf2;
+ json_buf2.append(json2);
+ butil::IOBufAsZeroCopyOutputStream output_stream2(&json_buf2);
+ ASSERT_TRUE(json2pb::ProtoMessageToJson(asms, &output_stream2, &error)) <<
error;
+ json2 = json_buf2.to_string();
+ ASSERT_NE(json2.find(R"("addr":"baidu.com")"), std::string::npos);
+ ASSERT_NE(json2.find(R"("email":"[email protected]")"), std::string::npos);
+ ASSERT_NE(json2.find(R"("office":"Shanghai")"), std::string::npos);
+}
+
} // namespace
diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp
index 815494ca..2da226cb 100644
--- a/test/bthread_rwlock_unittest.cpp
+++ b/test/bthread_rwlock_unittest.cpp
@@ -26,9 +26,9 @@ int c = 0;
void* rdlocker(void* arg) {
auto rw = (bthread_rwlock_t*)arg;
bthread_rwlock_rdlock(rw);
- LOG(INFO) <<butil::string_printf("[%" PRIu64 "] I'm rdlocker, %d, %"
PRId64 "ms\n",
- pthread_numeric_id(), ++c,
- butil::cpuwide_time_ms() - start_time);
+ LOG(INFO) << butil::string_printf("[%" PRIu64 "] I'm rdlocker, %d, %"
PRId64 "ms\n",
+ pthread_numeric_id(), ++c,
+ butil::cpuwide_time_ms() - start_time);
bthread_usleep(10000);
bthread_rwlock_unlock(rw);
return NULL;
@@ -38,8 +38,8 @@ void* wrlocker(void* arg) {
auto rw = (bthread_rwlock_t*)arg;
bthread_rwlock_wrlock(rw);
LOG(INFO) << butil::string_printf("[%" PRIu64 "] I'm wrlocker, %d, %"
PRId64 "ms\n",
- pthread_numeric_id(), ++c,
- butil::cpuwide_time_ms() - start_time);
+ pthread_numeric_id(), ++c,
+ butil::cpuwide_time_ms() - start_time);
bthread_usleep(10000);
bthread_rwlock_unlock(rw);
return NULL;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]