This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 0f5e102a Support proxy and generic call of baidu protocol (#2629)
0f5e102a is described below
commit 0f5e102a219af016e00f40cacbbb193a418ab358
Author: Bright Chen <[email protected]>
AuthorDate: Thu Jun 13 12:21:52 2024 +0800
Support proxy and generic call of baidu protocol (#2629)
* Support proxy and generic call of baidu protocol
* Add example
---
.../baidu_proxy_and_generic_call/CMakeLists.txt | 134 ++++++++++++
example/baidu_proxy_and_generic_call/client.cpp | 94 +++++++++
.../baidu_proxy_and_generic_call/echo.proto | 23 +--
example/baidu_proxy_and_generic_call/proxy.cpp | 142 +++++++++++++
example/baidu_proxy_and_generic_call/server.cpp | 118 +++++++++++
src/brpc/baidu_master_service.cpp | 52 +++++
src/brpc/baidu_master_service.h | 99 +++++++++
src/brpc/builtin/status_service.cpp | 11 +
src/brpc/channel.cpp | 2 +
src/brpc/controller.cpp | 6 +
src/brpc/controller.h | 6 +-
src/brpc/policy/baidu_rpc_protocol.cpp | 230 +++++++++++++--------
src/brpc/proto_base.proto | 4 +
src/brpc/serialized_request.cpp | 2 +-
...ialized_request.cpp => serialized_response.cpp} | 60 +++---
src/brpc/serialized_response.h | 83 ++++++++
src/brpc/server.cpp | 11 +
src/brpc/server.h | 5 +-
src/butil/memory/scope_guard.h | 2 +-
test/brpc_server_unittest.cpp | 111 ++++++++++
test/endpoint_unittest.cpp | 2 +-
21 files changed, 1066 insertions(+), 131 deletions(-)
diff --git a/example/baidu_proxy_and_generic_call/CMakeLists.txt
b/example/baidu_proxy_and_generic_call/CMakeLists.txt
new file mode 100644
index 00000000..8cc9c0f1
--- /dev/null
+++ b/example/baidu_proxy_and_generic_call/CMakeLists.txt
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8.10)
+project(baidu_proxy_and_generic_call C CXX)
+
+option(LINK_SO "Whether examples are linked dynamically" OFF)
+
+execute_process(
+ COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex
\".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
+ OUTPUT_VARIABLE OUTPUT_PATH
+)
+
+set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
+
+include(FindThreads)
+include(FindProtobuf)
+protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto)
+# include PROTO_HEADER
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
+if(LINK_SO)
+ find_library(BRPC_LIB NAMES brpc)
+else()
+ find_library(BRPC_LIB NAMES libbrpc.a brpc)
+endif()
+if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
+ message(FATAL_ERROR "Fail to find brpc")
+endif()
+include_directories(${BRPC_INCLUDE_PATH})
+
+find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
+find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
+if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
+ message(FATAL_ERROR "Fail to find gflags")
+endif()
+include_directories(${GFLAGS_INCLUDE_PATH})
+
+execute_process(
+ COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\"
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' |
tr -d '\n'"
+ OUTPUT_VARIABLE GFLAGS_NS
+)
+if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
+ execute_process(
+ COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\"
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' |
tr -d '\n'"
+ OUTPUT_VARIABLE GFLAGS_NS
+ )
+endif()
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ include(CheckFunctionExists)
+ CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+ if(NOT HAVE_CLOCK_GETTIME)
+ set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
+ endif()
+endif()
+
+set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__
-pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
+
+if(CMAKE_VERSION VERSION_LESS "3.1.3")
+ if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+ endif()
+ if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+ endif()
+else()
+ set(CMAKE_CXX_STANDARD 11)
+ set(CMAKE_CXX_STANDARD_REQUIRED ON)
+endif()
+
+find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
+find_library(LEVELDB_LIB NAMES leveldb)
+if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
+ message(FATAL_ERROR "Fail to find leveldb")
+endif()
+include_directories(${LEVELDB_INCLUDE_PATH})
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ set(OPENSSL_ROOT_DIR
+ "/usr/local/opt/openssl" # Homebrew installed OpenSSL
+ )
+endif()
+
+find_package(OpenSSL)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+set(DYNAMIC_LIB
+ ${CMAKE_THREAD_LIBS_INIT}
+ ${GFLAGS_LIBRARY}
+ ${PROTOBUF_LIBRARIES}
+ ${LEVELDB_LIB}
+ ${OPENSSL_CRYPTO_LIBRARY}
+ ${OPENSSL_SSL_LIBRARY}
+ dl
+ )
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ set(DYNAMIC_LIB ${DYNAMIC_LIB}
+ pthread
+ "-framework CoreFoundation"
+ "-framework CoreGraphics"
+ "-framework CoreData"
+ "-framework CoreText"
+ "-framework Security"
+ "-framework Foundation"
+ "-Wl,-U,_MallocExtension_ReleaseFreeMemory"
+ "-Wl,-U,_ProfilerStart"
+ "-Wl,-U,_ProfilerStop"
+ "-Wl,-U,__Z13GetStackTracePPvii")
+endif()
+
+add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
+add_executable(proxy proxy.cpp)
+add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})
+
+target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB})
+target_link_libraries(proxy ${BRPC_LIB} ${DYNAMIC_LIB})
+target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB})
diff --git a/example/baidu_proxy_and_generic_call/client.cpp
b/example/baidu_proxy_and_generic_call/client.cpp
new file mode 100644
index 00000000..d8040740
--- /dev/null
+++ b/example/baidu_proxy_and_generic_call/client.cpp
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A client sending requests to server every 1 second.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <brpc/channel.h>
+#include "echo.pb.h"
+
+DEFINE_int32(compress_type, 2, "The compress type of request");
+DEFINE_string(attachment, "", "Carry this along with requests");
+DEFINE_string(connection_type, "", "Connection type. Available values: single,
pooled, short");
+DEFINE_string(proxy_address, "0.0.0.0:8000", "IP Address of proxy");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
+
+int main(int argc, char* argv[]) {
+ // Parse gflags. We recommend you to use gflags as well.
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+ // A Channel represents a communication line to a Server. Notice that
+ // Channel is thread-safe and can be shared by all threads in your program.
+ brpc::Channel channel;
+
+ // Initialize the channel, NULL means using default options.
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_BAIDU_STD;
+ options.connection_type = FLAGS_connection_type;
+ options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
+ options.max_retry = FLAGS_max_retry;
+ if (channel.Init(FLAGS_proxy_address.c_str(),
+ FLAGS_load_balancer.c_str(), &options) != 0) {
+ LOG(ERROR) << "Fail to initialize channel";
+ return -1;
+ }
+
+ // Normally, you should not call a Channel directly, but instead construct
+ // a stub Service wrapping it. stub can be shared by all threads as well.
+ example::EchoService_Stub stub(&channel);
+
+ // Send a request and wait for the response every 1 second.
+ int log_id = 0;
+ while (!brpc::IsAskedToQuit()) {
+ // We will receive response synchronously, safe to put variables
+ // on stack.
+ example::EchoRequest request;
+ example::EchoResponse response;
+ brpc::Controller cntl;
+
+ request.set_message("hello world");
+
cntl.set_request_compress_type((brpc::CompressType)FLAGS_compress_type);
+
+ cntl.set_log_id(log_id++); // set by user
+ // Set attachment which is wired to network directly instead of
+ // being serialized into protobuf messages.
+ cntl.request_attachment().append(FLAGS_attachment);
+
+ // Because `done'(last parameter) is NULL, this function waits until
+ // the response comes back or error occurs(including timedout).
+ stub.Echo(&cntl, &request, &response, NULL);
+ if (!cntl.Failed()) {
+ LOG(INFO) << "Received response from " << cntl.remote_side()
+ << " to " << cntl.local_side()
+ << ": " << response.message()
+ << ", response compress type=" << cntl.response_compress_type()
+ << ", attached=" << cntl.response_attachment()
+ << ", latency=" << cntl.latency_us() << "us";
+ } else {
+ LOG(WARNING) << cntl.ErrorText();
+ }
+ usleep(FLAGS_interval_ms * 1000L);
+ }
+
+ LOG(INFO) << "EchoClient is going to quit";
+ return 0;
+}
diff --git a/src/brpc/proto_base.proto
b/example/baidu_proxy_and_generic_call/echo.proto
similarity index 75%
copy from src/brpc/proto_base.proto
copy to example/baidu_proxy_and_generic_call/echo.proto
index c0bbc086..2b39627f 100644
--- a/src/brpc/proto_base.proto
+++ b/example/baidu_proxy_and_generic_call/echo.proto
@@ -16,19 +16,18 @@
// under the License.
syntax="proto2";
+package example;
-package brpc;
+option cc_generic_services = true;
-message RedisRequestBase {}
-message RedisResponseBase {}
+message EchoRequest {
+ required string message = 1;
+};
-message EspMessageBase {}
+message EchoResponse {
+ required string message = 1;
+};
-message MemcacheRequestBase {}
-message MemcacheResponseBase {}
-
-message NsheadMessageBase {}
-
-message SerializedRequestBase {}
-
-message ThriftFramedMessageBase {}
+service EchoService {
+ rpc Echo(EchoRequest) returns (EchoResponse);
+};
diff --git a/example/baidu_proxy_and_generic_call/proxy.cpp
b/example/baidu_proxy_and_generic_call/proxy.cpp
new file mode 100644
index 00000000..81e1176b
--- /dev/null
+++ b/example/baidu_proxy_and_generic_call/proxy.cpp
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// todo
+// A proxy to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <butil/strings/string_number_conversions.h>
+#include <brpc/server.h>
+#include <brpc/controller.h>
+#include <brpc/channel.h>
+#include <json2pb/pb_to_json.h>
+
+DEFINE_int32(port, 8000, "TCP Port of this server");
+DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS."
+ " If this is set, the flag port will be ignored");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+ "read/write operations during the last `idle_timeout_s'");
+DEFINE_string(connection_type, "", "Connection type. Available values: single,
pooled, short");
+DEFINE_string(server_address, "0.0.0.0:8001", "IP Address of server");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
+
+// Your implementation of example::EchoService
+// Notice that implementing brpc::Describable grants the ability to put
+// additional information in /status.
+namespace example {
+class BaiduMasterServiceImpl : public brpc::BaiduMasterService {
+public:
+ void ProcessRpcRequest(brpc::Controller* cntl,
+ const brpc::SerializedRequest* request,
+ brpc::SerializedResponse* response,
+ ::google::protobuf::Closure* done) override {
+ // This object helps you to call done->Run() in RAII style. If you need
+ // to process the request asynchronously, pass done_guard.release().
+ brpc::ClosureGuard done_guard(done);
+
+ // A Channel represents a communication line to a Server. Notice that
+ // Channel is thread-safe and can be shared by all threads in your
program.
+ brpc::Channel channel;
+
+ // Initialize the channel, NULL means using default options.
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_BAIDU_STD;
+ options.connection_type = FLAGS_connection_type;
+ options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
+ options.max_retry = FLAGS_max_retry;
+ if (channel.Init(FLAGS_server_address.c_str(),
+ FLAGS_load_balancer.c_str(), &options) != 0) {
+ LOG(ERROR) << "Fail to initialize channel";
+ (*cntl->response_user_fields())["x-bd-proxy-error-code"] =
+ butil::IntToString(brpc::EINTERNAL);
+ (*cntl->response_user_fields())["x-bd-proxy-error-text"] =
+ "Fail to initialize channel";
+ return;
+ }
+
+ LOG(INFO) << "Received request[log_id=" << cntl->log_id()
+ << "] from " << cntl->remote_side()
+ << " to " << cntl->local_side()
+ << ", serialized request size=" <<
request->serialized_data().size()
+ << ", request compress type=" <<
cntl->request_compress_type()
+ << " (attached=" << cntl->request_attachment() << ")";
+
+ brpc::Controller call_cntl;
+ call_cntl.set_log_id(cntl->log_id());
+ call_cntl.request_attachment().swap(cntl->request_attachment());
+ call_cntl.set_request_compress_type(cntl->request_compress_type());
+ call_cntl.reset_sampled_request(cntl->release_sampled_request());
+ // It is ok to use request and response for sync rpc.
+ channel.CallMethod(NULL, &call_cntl, request, response, NULL);
+ (*cntl->response_user_fields())["x-bd-proxy-error-code"] =
+ butil::IntToString(call_cntl.ErrorCode());
+ if (call_cntl.Failed()) {
+ (*cntl->response_user_fields())["x-bd-proxy-error-text"] =
+ call_cntl.ErrorText();
+ LOG(ERROR) << "Fail to call service=" <<
call_cntl.sampled_request()->meta.service_name()
+ << ", method=" <<
call_cntl.sampled_request()->meta.method_name()
+ << ", error_code=" << call_cntl.ErrorCode()
+ << ", error_text=" << call_cntl.ErrorCode();
+ return;
+ } else {
+ LOG(INFO) << "Received response from " << call_cntl.remote_side()
+ << " to " << call_cntl.local_side()
+ << ", serialized response size=" <<
response->serialized_data().size()
+ << ", response compress type=" <<
call_cntl.response_compress_type()
+ << ", attached=" << call_cntl.response_attachment()
+ << ", latency=" << call_cntl.latency_us() << "us";
+ }
+ cntl->response_attachment().swap(call_cntl.response_attachment());
+ cntl->set_response_compress_type(call_cntl.response_compress_type());
+ }
+};
+} // namespace example
+
+int main(int argc, char* argv[]) {
+ // Parse gflags. We recommend you to use gflags as well.
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+ // Generally you only need one Server.
+ brpc::Server server;
+
+ butil::EndPoint point;
+ if (!FLAGS_listen_addr.empty()) {
+ if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
+ LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
+ return -1;
+ }
+ } else {
+ point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
+ }
+ // Start the server.
+ brpc::ServerOptions options;
+ // Add the baidu master service into server.
+ // Notice new operator, because server will delete it in dtor of Server.
+ options.baidu_master_service = new example::BaiduMasterServiceImpl();
+ options.idle_timeout_sec = FLAGS_idle_timeout_s;
+ if (server.Start(point, &options) != 0) {
+ LOG(ERROR) << "Fail to start EchoServer";
+ return -1;
+ }
+
+ // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+ server.RunUntilAskedToQuit();
+ return 0;
+}
diff --git a/example/baidu_proxy_and_generic_call/server.cpp
b/example/baidu_proxy_and_generic_call/server.cpp
new file mode 100644
index 00000000..b3f16173
--- /dev/null
+++ b/example/baidu_proxy_and_generic_call/server.cpp
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <json2pb/pb_to_json.h>
+#include "echo.pb.h"
+
+DEFINE_int32(compress_type, 2, "The compress type of response");
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8001, "TCP Port of this server");
+DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS."
+ " If this is set, the flag port will be ignored");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+ "read/write operations during the last `idle_timeout_s'");
+
+// Your implementation of example::EchoService
+// Notice that implementing brpc::Describable grants the ability to put
+// additional information in /status.
+namespace example {
+class EchoServiceImpl : public EchoService {
+public:
+ EchoServiceImpl() = default;
+ ~EchoServiceImpl() override = default;
+ void Echo(google::protobuf::RpcController* cntl_base,
+ const EchoRequest* request,
+ EchoResponse* response,
+ google::protobuf::Closure* done) override {
+ // This object helps you to call done->Run() in RAII style. If you need
+ // to process the request asynchronously, pass done_guard.release().
+ brpc::ClosureGuard done_guard(done);
+
+ auto cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ // The purpose of following logs is to help you to understand
+ // how clients interact with servers more intuitively. You should
+ // remove these logs in performance-sensitive servers.
+ LOG(INFO) << "Received request[log_id=" << cntl->log_id()
+ << "] from " << cntl->remote_side()
+ << " to " << cntl->local_side()
+ << ": " << request->message()
+ << ", request compress type=" <<
cntl->request_compress_type()
+ << ", attached=" << cntl->request_attachment();
+
+ // Fill response.
+ response->set_message(request->message());
+
cntl->set_response_compress_type((brpc::CompressType)FLAGS_compress_type);
+
+ // You can compress the response by setting Controller, but be aware
+ // that compression may be costly, evaluate before turning on.
+ // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
+
+ if (FLAGS_echo_attachment) {
+ // Set attachment which is wired to network directly instead of
+ // being serialized into protobuf messages.
+ cntl->response_attachment().append(cntl->request_attachment());
+ }
+ }
+};
+} // namespace example
+
+int main(int argc, char* argv[]) {
+ // Parse gflags. We recommend you to use gflags as well.
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+ // Generally you only need one Server.
+ brpc::Server server;
+
+ // Instance of your service.
+ example::EchoServiceImpl echo_service_impl;
+
+ // Add the service into server. Notice the second parameter, because the
+ // service is put on stack, we don't want server to delete it, otherwise
+ // use brpc::SERVER_OWNS_SERVICE.
+ if (server.AddService(&echo_service_impl,
+ brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+ LOG(ERROR) << "Fail to add service";
+ return -1;
+ }
+
+ butil::EndPoint point;
+ if (!FLAGS_listen_addr.empty()) {
+ if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
+ LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
+ return -1;
+ }
+ } else {
+ point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
+ }
+ // Start the server.
+ brpc::ServerOptions options;
+ options.idle_timeout_sec = FLAGS_idle_timeout_s;
+ if (server.Start(point, &options) != 0) {
+ LOG(ERROR) << "Fail to start EchoServer";
+ return -1;
+ }
+
+ // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+ server.RunUntilAskedToQuit();
+ return 0;
+}
diff --git a/src/brpc/baidu_master_service.cpp
b/src/brpc/baidu_master_service.cpp
new file mode 100644
index 00000000..0b983732
--- /dev/null
+++ b/src/brpc/baidu_master_service.cpp
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "brpc/baidu_master_service.h"
+#include "butil/logging.h"
+
+namespace brpc {
+
+BaiduMasterService::BaiduMasterService()
+ :_status(new(std::nothrow) MethodStatus) {
+ LOG_IF(FATAL, NULL == _status) << "Fail to new MethodStatus";
+}
+
+BaiduMasterService::~BaiduMasterService() {
+ delete _status;
+ _status = NULL;
+}
+
+void BaiduMasterService::Describe(std::ostream &os,
+ const DescribeOptions&) const {
+ os << butil::class_name_str(*this);
+}
+
+void BaiduMasterService::Expose(const butil::StringPiece& prefix) {
+ if (NULL == _status) {
+ return;
+ }
+ std::string s;
+ const std::string& cached_name = butil::class_name_str(*this);
+ s.reserve(prefix.size() + 1 + cached_name.size());
+ s.append(prefix.data(), prefix.size());
+ s.push_back('_');
+ s.append(cached_name);
+ _status->Expose(s);
+}
+
+}
\ No newline at end of file
diff --git a/src/brpc/baidu_master_service.h b/src/brpc/baidu_master_service.h
new file mode 100644
index 00000000..9dc7ebbf
--- /dev/null
+++ b/src/brpc/baidu_master_service.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef BRPC_BAIDU_MASTER_SERVICE_H
+#define BRPC_BAIDU_MASTER_SERVICE_H
+
+#include <google/protobuf/service.h>
+#include <google/protobuf/stubs/callback.h>
+#include "brpc/details/method_status.h"
+#include "brpc/proto_base.pb.h"
+#include "brpc/controller.h"
+#include "brpc/serialized_request.h"
+#include "brpc/serialized_response.h"
+
+namespace brpc {
+
+namespace policy {
+void ProcessRpcRequest(InputMessageBase* msg_base);
+}
+
+class BaiduMasterService : public ::google::protobuf::Service
+ , public Describable {
+public:
+ BaiduMasterService();
+ ~BaiduMasterService() override;
+
+ DISALLOW_COPY_AND_ASSIGN(BaiduMasterService);
+
+ AdaptiveMaxConcurrency& MaxConcurrency() {
+ return _max_concurrency;
+ }
+
+ virtual void ProcessRpcRequest(Controller* cntl,
+ const SerializedRequest* request,
+ SerializedResponse* response,
+ ::google::protobuf::Closure* done) = 0;
+
+
+ // Put descriptions into the stream.
+ void Describe(std::ostream &os, const DescribeOptions&) const override;
+
+ // implements Service ----------------------------------------------
+
+ void CallMethod(const ::google::protobuf::MethodDescriptor*,
+ ::google::protobuf::RpcController* controller,
+ const ::google::protobuf::Message* request,
+ ::google::protobuf::Message* response,
+ ::google::protobuf::Closure* done) override {
+ ProcessRpcRequest((Controller*)controller,
+ (const SerializedRequest*)request,
+ (SerializedResponse*)response, done);
+ }
+
+ const ::google::protobuf::ServiceDescriptor* GetDescriptor() override {
+ return BaiduMasterServiceBase::descriptor();
+ }
+
+ const ::google::protobuf::Message& GetRequestPrototype(
+ const ::google::protobuf::MethodDescriptor*) const override {
+ return _default_request;
+ }
+
+ const ::google::protobuf::Message& GetResponsePrototype(
+ const ::google::protobuf::MethodDescriptor*) const override {
+ return _default_response;
+ }
+
+private:
+friend void policy::ProcessRpcRequest(InputMessageBase* msg_base);
+friend class StatusService;
+friend class Server;
+
+ void Expose(const butil::StringPiece& prefix);
+
+ SerializedRequest _default_request;
+ SerializedResponse _default_response;
+
+ MethodStatus* _status;
+ AdaptiveMaxConcurrency _max_concurrency;
+};
+
+}
+
+#endif //BRPC_BAIDU_MASTER_SERVICE_H
diff --git a/src/brpc/builtin/status_service.cpp
b/src/brpc/builtin/status_service.cpp
index a6f5a4da..ea731e52 100644
--- a/src/brpc/builtin/status_service.cpp
+++ b/src/brpc/builtin/status_service.cpp
@@ -200,6 +200,17 @@ void
StatusService::default_method(::google::protobuf::RpcController* cntl_base,
}
}
}
+ const BaiduMasterService* baidu_master_service =
server->options().baidu_master_service;
+ if (baidu_master_service && baidu_master_service->_status) {
+ DescribeOptions options;
+ options.verbose = false;
+ options.use_html = use_html;
+ os << (use_html ? "<h3>" : "[");
+ baidu_master_service->Describe(os, options);
+ os << (use_html ? "</h3>\n" : "]\n");
+ baidu_master_service->_status->Describe(os, desc_options);
+ os << '\n';
+ }
const NsheadService* nshead_svc = server->options().nshead_service;
if (nshead_svc && nshead_svc->_status) {
DescribeOptions options;
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index ba0e8161..5fc66096 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -32,6 +32,8 @@
#include "brpc/details/load_balancer_with_naming.h"
#include "brpc/controller.h"
#include "brpc/channel.h"
+#include "brpc/serialized_request.h"
+#include "brpc/serialized_response.h"
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/rdma/rdma_helper.h"
#include "brpc/policy/esp_authenticator.h"
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 1d9b1bb9..98e25ae2 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -1427,6 +1427,12 @@ void Controller::reset_sampled_request(SampledRequest*
req) {
_sampled_request = req;
}
+SampledRequest* Controller::release_sampled_request() {
+ SampledRequest* saved_sampled_request = _sampled_request;
+ _sampled_request = NULL;
+ return saved_sampled_request;
+}
+
void Controller::set_stream_creator(StreamCreator* sc) {
if (_stream_creator) {
LOG(FATAL) << "A StreamCreator has been set previously";
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index d3ffb99f..5b2132b4 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -46,6 +46,7 @@
#include "brpc/progressive_reader.h" // ProgressiveReader
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
+#include "brpc/rpc_dump.h"
// EAUTH is defined in MAC
#ifndef EAUTH
@@ -68,7 +69,6 @@ class SharedLoadBalancer;
class ExcludedServers;
class RPCSender;
class StreamSettings;
-class SampledRequest;
class MongoContext;
class RetryPolicy;
class InputMessageBase;
@@ -305,7 +305,9 @@ public:
// Get/own SampledRequest for sending dumped requests.
// Deleted along with controller.
void reset_sampled_request(SampledRequest* req);
- const SampledRequest* sampled_request() { return _sampled_request; }
+ const SampledRequest* sampled_request() const { return _sampled_request; }
+ SampledRequest* release_sampled_request();
+
// Attach a StreamCreator to this RPC. Notice that the ownership of sc has
// been transferred to cntl, and sc->DestroyStreamCreator() would be called
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 6fa17d6c..6ce76467 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -135,6 +135,28 @@ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket*
socket,
return MakeMessage(msg);
}
+static bool SerializeResponse(const google::protobuf::Message& res,
+ Controller& cntl, CompressType compress_type,
+ butil::IOBuf& buf) {
+ if (res.GetDescriptor() == SerializedResponse::descriptor()) {
+ buf.swap(((SerializedResponse&)res).serialized_data());
+ return true;
+ }
+
+ if (!res.IsInitialized()) {
+ cntl.SetFailed(ERESPONSE,
+ "Missing required fields in response: %s",
+ res.InitializationErrorString().c_str());
+ return false;
+ } else if (!SerializeAsCompressedData(res, &buf, compress_type)) {
+ cntl.SetFailed(ERESPONSE,
+ "Fail to serialize response, CompressType=%s",
+ CompressTypeToCStr(compress_type));
+ return false;
+ }
+ return true;
+}
+
// Used by UT, can't be static.
void SendRpcResponse(int64_t correlation_id,
Controller* cntl,
@@ -170,18 +192,10 @@ void SendRpcResponse(int64_t correlation_id,
// `res' can be NULL here, in which case we don't serialize it
// If user calls `SetFailed' on Controller, we don't serialize
// response either
- CompressType type = cntl->response_compress_type();
+ CompressType compress_type = cntl->response_compress_type();
if (res != NULL && !cntl->Failed()) {
- if (!res->IsInitialized()) {
- cntl->SetFailed(
- ERESPONSE, "Missing required fields in response: %s",
- res->InitializationErrorString().c_str());
- } else if (!SerializeAsCompressedData(*res, &res_body, type)) {
- cntl->SetFailed(ERESPONSE, "Fail to serialize response, "
- "CompressType=%s", CompressTypeToCStr(type));
- } else {
- append_body = true;
- }
+ append_body = SerializeResponse(
+ *res, *cntl, compress_type, res_body);
}
// Don't use res->ByteSize() since it may be compressed
@@ -207,7 +221,7 @@ void SendRpcResponse(int64_t correlation_id,
response_meta->set_error_text(cntl->ErrorText());
}
meta.set_correlation_id(correlation_id);
- meta.set_compress_type(cntl->response_compress_type());
+ meta.set_compress_type(compress_type);
if (attached_size > 0) {
meta.set_attachment_size(attached_size);
}
@@ -236,7 +250,7 @@ void SendRpcResponse(int64_t correlation_id,
SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
if (append_body) {
res_buf.append(res_body.movable());
- if (attached_size) {
+ if (attached_size > 0) {
res_buf.append(cntl->response_attachment().movable());
}
}
@@ -360,6 +374,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
LOG(WARNING) << "Fail to new Controller";
return;
}
+
std::unique_ptr<google::protobuf::Message> req;
std::unique_ptr<google::protobuf::Message> res;
@@ -442,84 +457,129 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
break;
}
- // NOTE(gejun): jprotobuf sends service names without packages. So the
- // name should be changed to full when it's not.
- butil::StringPiece svc_name(request_meta.service_name());
- if (svc_name.find('.') == butil::StringPiece::npos) {
- const Server::ServiceProperty* sp =
- server_accessor.FindServicePropertyByName(svc_name);
- if (NULL == sp) {
- cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",
- request_meta.service_name().c_str());
+ const int req_size = static_cast<int>(msg->payload.size());
+ if (meta.has_attachment_size()) {
+ if (req_size < meta.attachment_size()) {
+ cntl->SetFailed(EREQUEST,
+ "attachment_size=%d is larger than request_size=%d",
+ meta.attachment_size(), req_size);
break;
}
- svc_name = sp->service->GetDescriptor()->full_name();
}
- const Server::MethodProperty* mp =
- server_accessor.FindMethodPropertyByFullName(
- svc_name, request_meta.method_name());
- if (NULL == mp) {
- cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",
- request_meta.service_name().c_str(),
- request_meta.method_name().c_str());
- break;
- } else if (mp->service->GetDescriptor()
- == BadMethodService::descriptor()) {
- BadMethodRequest breq;
- BadMethodResponse bres;
- breq.set_service_name(request_meta.service_name());
- mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres,
NULL);
- break;
- }
- // Switch to service-specific error.
- non_service_error.release();
- method_status = mp->status;
- if (method_status) {
- int rejected_cc = 0;
- if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
- cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter,
concurrency=%d",
- mp->method->full_name().c_str(), rejected_cc);
+
+ google::protobuf::Service* svc = NULL;
+ google::protobuf::MethodDescriptor* method = NULL;
+ if (NULL != server->options().baidu_master_service) {
+ svc = server->options().baidu_master_service;
+ auto sampled_request = new (std::nothrow) SampledRequest;
+ if (NULL == sampled_request) {
+ cntl->SetFailed(ENOMEM, "Fail to get sampled_request");
break;
}
- }
- google::protobuf::Service* svc = mp->service;
- const google::protobuf::MethodDescriptor* method = mp->method;
- accessor.set_method(method);
+
sampled_request->meta.set_service_name(request_meta.service_name());
+ sampled_request->meta.set_method_name(request_meta.method_name());
+ cntl->reset_sampled_request(sampled_request);
+ // Switch to service-specific error.
+ non_service_error.release();
+ method_status = server->options().baidu_master_service->_status;
+ if (method_status) {
+ int rejected_cc = 0;
+ if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
+ cntl->SetFailed(
+ ELIMIT,
+ "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
+ butil::class_name<BaiduMasterService>(), rejected_cc);
+ break;
+ }
+ }
+ if (span) {
+ span->ResetServerSpanName(sampled_request->meta.method_name());
+ }
+ auto serialized_request = (SerializedRequest*)
+ svc->GetRequestPrototype(NULL).New();
+ req.reset(serialized_request);
+ res.reset(svc->GetResponsePrototype(NULL).New());
- if (!server->AcceptRequest(cntl.get())) {
- break;
- }
+ msg->payload.cutn(&serialized_request->serialized_data(),
+ req_size - meta.attachment_size());
+ if (!msg->payload.empty()) {
+ cntl->request_attachment().swap(msg->payload);
+ }
+ } else {
+ // NOTE(gejun): jprotobuf sends service names without packages. So
the
+ // name should be changed to full when it's not.
+ butil::StringPiece svc_name(request_meta.service_name());
+ if (svc_name.find('.') == butil::StringPiece::npos) {
+ const Server::ServiceProperty* sp =
+ server_accessor.FindServicePropertyByName(svc_name);
+ if (NULL == sp) {
+ cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",
+ request_meta.service_name().c_str());
+ break;
+ }
+ svc_name = sp->service->GetDescriptor()->full_name();
+ }
+ const Server::MethodProperty* mp =
+ server_accessor.FindMethodPropertyByFullName(
+ svc_name, request_meta.method_name());
+ if (NULL == mp) {
+ cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",
+ request_meta.service_name().c_str(),
+ request_meta.method_name().c_str());
+ break;
+ } else if (mp->service->GetDescriptor() ==
BadMethodService::descriptor()) {
+ BadMethodRequest breq;
+ BadMethodResponse bres;
+ breq.set_service_name(request_meta.service_name());
+ mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres,
NULL);
+ break;
+ }
+ // Switch to service-specific error.
+ non_service_error.release();
+ method_status = mp->status;
+ if (method_status) {
+ int rejected_cc = 0;
+ if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
+ cntl->SetFailed(
+ ELIMIT,
+ "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
+ mp->method->full_name().c_str(), rejected_cc);
+ break;
+ }
+ }
+ svc = mp->service;
+ method =
const_cast<google::protobuf::MethodDescriptor*>(mp->method);
+ accessor.set_method(method);
- if (span) {
- span->ResetServerSpanName(method->full_name());
- }
- const int req_size = static_cast<int>(msg->payload.size());
- butil::IOBuf req_buf;
- butil::IOBuf* req_buf_ptr = &msg->payload;
- if (meta.has_attachment_size()) {
- if (req_size < meta.attachment_size()) {
- cntl->SetFailed(EREQUEST,
- "attachment_size=%d is larger than request_size=%d",
- meta.attachment_size(), req_size);
+ if (span) {
+ span->ResetServerSpanName(method->full_name());
+ }
+
+ if (!server->AcceptRequest(cntl.get())) {
break;
}
+
+ butil::IOBuf req_buf;
int body_without_attachment_size = req_size -
meta.attachment_size();
msg->payload.cutn(&req_buf, body_without_attachment_size);
- req_buf_ptr = &req_buf;
- cntl->request_attachment().swap(msg->payload);
- }
+ if (meta.attachment_size() > 0) {
+ cntl->request_attachment().swap(msg->payload);
+ }
- CompressType req_cmp_type = (CompressType)meta.compress_type();
- req.reset(svc->GetRequestPrototype(method).New());
- if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {
- cntl->SetFailed(EREQUEST, "Fail to parse request message, "
- "CompressType=%s, request_size=%d",
- CompressTypeToCStr(req_cmp_type), req_size);
- break;
+ auto req_cmp_type =
static_cast<CompressType>(meta.compress_type());
+ req.reset(svc->GetRequestPrototype(method).New());
+ if (!ParseFromCompressedData(req_buf, req.get(), req_cmp_type)) {
+ cntl->SetFailed(EREQUEST, "Fail to parse request message, "
+ "CompressType=%s, request_size=%d",
+ CompressTypeToCStr(req_cmp_type), req_size);
+ break;
+ }
+
+ res.reset(svc->GetResponsePrototype(method).New());
+ req_buf.clear();
}
-
- res.reset(svc->GetResponsePrototype(method).New());
+
// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, Controller*, const google::protobuf::Message*,
@@ -531,7 +591,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
// optional, just release resource ASAP
msg.reset();
- req_buf.clear();
if (span) {
span->set_start_callback_us(butil::cpuwide_time_us());
@@ -653,10 +712,13 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
cntl->response_attachment().swap(msg->payload);
}
- const CompressType res_cmp_type = (CompressType)meta.compress_type();
+ auto res_cmp_type = (CompressType)meta.compress_type();
cntl->set_response_compress_type(res_cmp_type);
if (cntl->response()) {
- if (!ParseFromCompressedData(
+ if (cntl->response()->GetDescriptor() ==
SerializedResponse::descriptor()) {
+ ((SerializedResponse*)cntl->response())->
+ serialized_data().append(*res_buf_ptr);
+ } else if (!ParseFromCompressedData(
*res_buf_ptr, cntl->response(), res_cmp_type)) {
cntl->SetFailed(
ERESPONSE, "Fail to parse response message, "
@@ -692,13 +754,15 @@ void PackRpcRequest(butil::IOBuf* req_buf,
method->service()->name());
request_meta->set_method_name(method->name());
meta.set_compress_type(cntl->request_compress_type());
- } else if (cntl->sampled_request()) {
+ } else if (NULL != cntl->sampled_request()) {
// Replaying. Keep service-name as the one seen by server.
request_meta->set_service_name(cntl->sampled_request()->meta.service_name());
request_meta->set_method_name(cntl->sampled_request()->meta.method_name());
- meta.set_compress_type(cntl->sampled_request()->meta.compress_type());
+
meta.set_compress_type(cntl->sampled_request()->meta.has_compress_type() ?
+ cntl->sampled_request()->meta.compress_type() :
+ cntl->request_compress_type());
} else {
- return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __FUNCTION__);
+ return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __func__ );
}
if (cntl->has_log_id()) {
request_meta->set_log_id(cntl->log_id());
diff --git a/src/brpc/proto_base.proto b/src/brpc/proto_base.proto
index c0bbc086..30033d49 100644
--- a/src/brpc/proto_base.proto
+++ b/src/brpc/proto_base.proto
@@ -16,6 +16,7 @@
// under the License.
syntax="proto2";
+option cc_generic_services = true;
package brpc;
@@ -30,5 +31,8 @@ message MemcacheResponseBase {}
message NsheadMessageBase {}
message SerializedRequestBase {}
+message SerializedResponseBase {}
message ThriftFramedMessageBase {}
+
+service BaiduMasterServiceBase {}
diff --git a/src/brpc/serialized_request.cpp b/src/brpc/serialized_request.cpp
index 499738f6..33082883 100644
--- a/src/brpc/serialized_request.cpp
+++ b/src/brpc/serialized_request.cpp
@@ -43,7 +43,7 @@ void SerializedRequest::SharedDtor() {
}
void SerializedRequest::SetCachedSize(int /*size*/) const {
- CHECK(false) << "You're not supposed to call " << __FUNCTION__;
+ CHECK(false) << "You're not supposed to call " << __func__;
}
const ::google::protobuf::Descriptor* SerializedRequest::descriptor() {
return SerializedRequestBase::descriptor();
diff --git a/src/brpc/serialized_request.cpp b/src/brpc/serialized_response.cpp
similarity index 55%
copy from src/brpc/serialized_request.cpp
copy to src/brpc/serialized_response.cpp
index 499738f6..817ccb4c 100644
--- a/src/brpc/serialized_request.cpp
+++ b/src/brpc/serialized_response.cpp
@@ -16,112 +16,112 @@
// under the License.
-#include "brpc/serialized_request.h"
+#include "brpc/serialized_response.h"
#include "butil/logging.h"
namespace brpc {
-SerializedRequest::SerializedRequest()
+SerializedResponse::SerializedResponse()
: ::google::protobuf::Message() {
SharedCtor();
}
-SerializedRequest::SerializedRequest(const SerializedRequest& from)
+SerializedResponse::SerializedResponse(const SerializedResponse& from)
: ::google::protobuf::Message() {
SharedCtor();
MergeFrom(from);
}
-void SerializedRequest::SharedCtor() {
+void SerializedResponse::SharedCtor() {
}
-SerializedRequest::~SerializedRequest() {
+SerializedResponse::~SerializedResponse() {
SharedDtor();
}
-void SerializedRequest::SharedDtor() {
+void SerializedResponse::SharedDtor() {
}
-void SerializedRequest::SetCachedSize(int /*size*/) const {
- CHECK(false) << "You're not supposed to call " << __FUNCTION__;
+void SerializedResponse::SetCachedSize(int /*size*/) const {
+ CHECK(false) << "You're not supposed to call " << __func__;
}
-const ::google::protobuf::Descriptor* SerializedRequest::descriptor() {
- return SerializedRequestBase::descriptor();
+const ::google::protobuf::Descriptor* SerializedResponse::descriptor() {
+ return SerializedResponseBase::descriptor();
}
-SerializedRequest* SerializedRequest::New() const {
- return new SerializedRequest;
+SerializedResponse* SerializedResponse::New() const {
+ return new SerializedResponse;
}
#if GOOGLE_PROTOBUF_VERSION >= 3006000
-SerializedRequest*
-SerializedRequest::New(::google::protobuf::Arena* arena) const {
- return CreateMaybeMessage<SerializedRequest>(arena);
+SerializedResponse*
+SerializedResponse::New(::google::protobuf::Arena* arena) const {
+ return CreateMaybeMessage<SerializedResponse>(arena);
}
#endif
-void SerializedRequest::Clear() {
+void SerializedResponse::Clear() {
_serialized.clear();
}
-bool SerializedRequest::MergePartialFromCodedStream(
+bool SerializedResponse::MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream*) {
CHECK(false) << "You're not supposed to call " << __FUNCTION__;
return false;
}
-void SerializedRequest::SerializeWithCachedSizes(
+void SerializedResponse::SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream*) const {
CHECK(false) << "You're not supposed to call " << __FUNCTION__;
}
-::google::protobuf::uint8* SerializedRequest::SerializeWithCachedSizesToArray(
+::google::protobuf::uint8* SerializedResponse::SerializeWithCachedSizesToArray(
::google::protobuf::uint8* target) const {
CHECK(false) << "You're not supposed to call " << __FUNCTION__;
return target;
}
-int SerializedRequest::ByteSize() const {
+int SerializedResponse::ByteSize() const {
return (int)_serialized.size();
}
-void SerializedRequest::MergeFrom(const ::google::protobuf::Message&) {
+void SerializedResponse::MergeFrom(const ::google::protobuf::Message&) {
CHECK(false) << "You're not supposed to call " << __FUNCTION__;
}
-void SerializedRequest::MergeFrom(const SerializedRequest&) {
+void SerializedResponse::MergeFrom(const SerializedResponse&) {
CHECK(false) << "You're not supposed to call " << __FUNCTION__;
}
-void SerializedRequest::CopyFrom(const ::google::protobuf::Message& from) {
+void SerializedResponse::CopyFrom(const ::google::protobuf::Message& from) {
if (&from == this) return;
- const SerializedRequest* source = dynamic_cast<const
SerializedRequest*>(&from);
+ const SerializedResponse* source = dynamic_cast<const
SerializedResponse*>(&from);
if (source == NULL) {
- CHECK(false) << "SerializedRequest can only CopyFrom
SerializedRequest";
+ CHECK(false) << "SerializedResponse can only CopyFrom
SerializedResponse";
} else {
_serialized = source->_serialized;
}
}
-void SerializedRequest::CopyFrom(const SerializedRequest& from) {
+void SerializedResponse::CopyFrom(const SerializedResponse& from) {
if (&from == this) return;
_serialized = from._serialized;
}
-bool SerializedRequest::IsInitialized() const {
+bool SerializedResponse::IsInitialized() const {
// Always true because it's already serialized.
return true;
}
-void SerializedRequest::Swap(SerializedRequest* other) {
+void SerializedResponse::Swap(SerializedResponse* other) {
if (other != this) {
_serialized.swap(other->_serialized);
}
}
-::google::protobuf::Metadata SerializedRequest::GetMetadata() const {
+::google::protobuf::Metadata SerializedResponse::GetMetadata() const {
::google::protobuf::Metadata metadata;
- metadata.descriptor = SerializedRequest::descriptor();
+ metadata.descriptor = SerializedResponse::descriptor();
metadata.reflection = NULL;
return metadata;
}
diff --git a/src/brpc/serialized_response.h b/src/brpc/serialized_response.h
new file mode 100644
index 00000000..4e7d86e7
--- /dev/null
+++ b/src/brpc/serialized_response.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef BRPC_SERIALIZED_RESPONSE_H
+#define BRPC_SERIALIZED_RESPONSE_H
+
+#include <google/protobuf/message.h>
+#include "butil/iobuf.h"
+#include "brpc/proto_base.pb.h"
+#include "brpc/pb_compat.h"
+
+namespace brpc {
+
+class SerializedResponse : public ::google::protobuf::Message {
+public:
+ SerializedResponse();
+ virtual ~SerializedResponse();
+
+ SerializedResponse(const SerializedResponse& from);
+
+ inline SerializedResponse& operator=(const SerializedResponse& from) {
+ CopyFrom(from);
+ return *this;
+ }
+
+ static const ::google::protobuf::Descriptor* descriptor();
+
+ void Swap(SerializedResponse* other);
+
+ // implements Message ----------------------------------------------
+
+ SerializedResponse* New() const PB_319_OVERRIDE;
+#if GOOGLE_PROTOBUF_VERSION >= 3006000
+ SerializedResponse* New(::google::protobuf::Arena* arena) const override;
+#endif
+ void CopyFrom(const ::google::protobuf::Message& from) PB_321_OVERRIDE;
+ void CopyFrom(const SerializedResponse& from);
+ void Clear() override;
+ bool IsInitialized() const override;
+ int ByteSize() const;
+ int GetCachedSize() const override { return (int)_serialized.size(); }
+ butil::IOBuf& serialized_data() { return _serialized; }
+ const butil::IOBuf& serialized_data() const { return _serialized; }
+
+protected:
+ ::google::protobuf::Metadata GetMetadata() const override;
+
+private:
+ bool MergePartialFromCodedStream(
+ ::google::protobuf::io::CodedInputStream* input) PB_310_OVERRIDE;
+ void SerializeWithCachedSizes(
+ ::google::protobuf::io::CodedOutputStream* output) const
PB_310_OVERRIDE;
+ ::google::protobuf::uint8* SerializeWithCachedSizesToArray(
+ ::google::protobuf::uint8* output) const PB_310_OVERRIDE;
+ void MergeFrom(const ::google::protobuf::Message& from) override;
+ void MergeFrom(const SerializedResponse& from);
+ void SharedCtor();
+ void SharedDtor();
+ void SetCachedSize(int size) const override;
+
+private:
+ butil::IOBuf _serialized;
+};
+
+} // namespace brpc
+
+
+#endif // BRPC_SERIALIZED_RESPONSE_H
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 51fb1d16..399f348d 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -77,6 +77,7 @@
#include "brpc/builtin/common.h" // GetProgramName
#include "brpc/details/tcmalloc_extension.h"
#include "brpc/rdma/rdma_helper.h"
+#include "brpc/baidu_master_service.h"
inline std::ostream& operator<<(std::ostream& os, const timeval& tm) {
const char old_fill = os.fill();
@@ -145,6 +146,7 @@ ServerOptions::ServerOptions()
, has_builtin_services(true)
, force_ssl(false)
, use_rdma(false)
+ , baidu_master_service(NULL)
, http_master_service(NULL)
, health_reporter(NULL)
, rtmp_service(NULL)
@@ -338,6 +340,9 @@ void* Server::UpdateDerivedVars(void* arg) {
it->second.status->Expose(mprefix);
}
}
+ if (server->options().baidu_master_service) {
+ server->options().baidu_master_service->Expose(prefix);
+ }
if (server->options().nshead_service) {
server->options().nshead_service->Expose(prefix);
}
@@ -2240,6 +2245,12 @@ AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const
butil::StringPiece& full_
return options().thrift_service->_max_concurrency;
}
#endif
+ if (full_method_name == butil::class_name_str<BaiduMasterService>()) {
+ if (NULL == options().baidu_master_service) {
+ break;
+ }
+ return options().baidu_master_service->_max_concurrency;
+ }
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
diff --git a/src/brpc/server.h b/src/brpc/server.h
index 5bc518ef..c9459c23 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -43,6 +43,7 @@
#include "brpc/redis.h"
#include "brpc/interceptor.h"
#include "brpc/concurrency_limiter.h"
+#include "brpc/baidu_master_service.h"
namespace brpc {
@@ -225,6 +226,8 @@ struct ServerOptions {
// Default: false
bool use_rdma;
+ BaiduMasterService* baidu_master_service;
+
// [CAUTION] This option is for implementing specialized http proxies,
// most users don't need it. Don't change this option unless you fully
// understand the description below.
@@ -235,7 +238,7 @@ struct ServerOptions {
// and response must have no fields.
//
// Owned by Server and deleted in server's destructor
- google::protobuf::Service* http_master_service;
+ ::google::protobuf::Service* http_master_service;
// If this field is on, contents on /health page is generated by calling
// health_reporter->GenerateReport(). This object is NOT owned by server
diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h
index ec662b46..1f2da79a 100644
--- a/src/butil/memory/scope_guard.h
+++ b/src/butil/memory/scope_guard.h
@@ -78,7 +78,7 @@ ScopeGuard<Callback> MakeScopeGuard(Callback&& callback)
noexcept {
}
namespace internal {
-// for BAIDU_SCOPE_EXIT.
+// for BRPC_SCOPE_EXIT.
enum class ScopeExitHelper {};
template<typename Callback>
diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp
index 8a8a76d8..cc98f111 100644
--- a/test/brpc_server_unittest.cpp
+++ b/test/brpc_server_unittest.cpp
@@ -51,6 +51,7 @@
#include "brpc/channel.h"
#include "brpc/socket_map.h"
#include "brpc/controller.h"
+#include "brpc/compress.h"
#include "echo.pb.h"
#include "v1.pb.h"
#include "v2.pb.h"
@@ -1656,4 +1657,114 @@ TEST_F(ServerTest, user_fields) {
ASSERT_EQ(*val, EXP_USER_FIELD_VALUE);
}
+class BaiduMasterServiceImpl : public brpc::BaiduMasterService {
+public:
+ void ProcessRpcRequest(brpc::Controller* cntl,
+ const brpc::SerializedRequest* request,
+ brpc::SerializedResponse* response,
+ ::google::protobuf::Closure* done) override {
+ // This object helps you to call done->Run() in RAII style. If you need
+ // to process the request asynchronously, pass done_guard.release().
+ brpc::ClosureGuard done_guard(done);
+ ASSERT_NE(nullptr, cntl->sampled_request());
+ ASSERT_TRUE(cntl->sampled_request()->meta.has_service_name());
+ ASSERT_EQ(test::EchoService::descriptor()->full_name(),
+ cntl->sampled_request()->meta.service_name());
+ ASSERT_TRUE(cntl->sampled_request()->meta.has_method_name());
+ ASSERT_EQ("Echo", cntl->sampled_request()->meta.method_name());
+ test::EchoRequest echo_request;
+ test::EchoResponse echo_response;
+ brpc::CompressType type = cntl->request_compress_type();
+ ASSERT_TRUE(brpc::ParseFromCompressedData(
+ request->serialized_data(), &echo_request, type));
+ ASSERT_EQ(EXP_REQUEST, echo_request.message());
+ ASSERT_EQ(EXP_REQUEST, cntl->request_attachment().to_string());
+
+ echo_response.set_message(EXP_RESPONSE);
+ butil::IOBuf compressed_data;
+ ASSERT_TRUE(brpc::SerializeAsCompressedData(
+ echo_response, &response->serialized_data(), type));
+ cntl->set_response_compress_type(type);
+ cntl->response_attachment().append(EXP_RESPONSE);
+ }
+};
+
+TEST_F(ServerTest, baidu_master_service) {
+ butil::EndPoint ep;
+ ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
+ brpc::Server server;
+ EchoServiceImpl service;
+ ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+ brpc::ServerOptions opt;
+ opt.baidu_master_service = new BaiduMasterServiceImpl;
+ ASSERT_EQ(0, server.Start(ep, &opt));
+
+ brpc::Channel chan;
+ brpc::ChannelOptions copt;
+ copt.protocol = "baidu_std";
+ ASSERT_EQ(0, chan.Init(ep, &copt));
+ brpc::Controller cntl;
+ test::EchoRequest req;
+ test::EchoResponse res;
+ req.set_message(EXP_REQUEST);
+ cntl.request_attachment().append(EXP_REQUEST);
+ cntl.set_request_compress_type(brpc::COMPRESS_TYPE_GZIP);
+ test::EchoService_Stub stub(&chan);
+ stub.Echo(&cntl, &req, &res, NULL);
+ ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+ ASSERT_EQ(EXP_RESPONSE, res.message());
+ ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string());
+
+ ASSERT_EQ(0, server.Stop(0));
+ ASSERT_EQ(0, server.Join());
+}
+
+
+TEST_F(ServerTest, generic_call) {
+ butil::EndPoint ep;
+ ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
+ brpc::Server server;
+ EchoServiceImpl service;
+ ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+ brpc::ServerOptions opt;
+ opt.baidu_master_service = new BaiduMasterServiceImpl;
+ ASSERT_EQ(0, server.Start(ep, &opt));
+
+ {
+ brpc::Channel chan;
+ brpc::ChannelOptions copt;
+ copt.protocol = "baidu_std";
+ ASSERT_EQ(0, chan.Init(ep, &copt));
+ brpc::Controller cntl;
+ test::EchoRequest req;
+ test::EchoResponse res;
+ req.set_message(EXP_REQUEST);
+
+ brpc::SerializedResponse serialized_response;
+ brpc::SerializedRequest serialized_request;
+ brpc::CompressType type = brpc::COMPRESS_TYPE_GZIP;
+ ASSERT_TRUE(brpc::SerializeAsCompressedData(
+ req, &serialized_request.serialized_data(), type));
+ cntl.request_attachment().append(EXP_REQUEST);
+ cntl.set_request_compress_type(type);
+ auto sampled_request = new (std::nothrow) brpc::SampledRequest();
+ sampled_request->meta.set_service_name(
+ test::EchoService::descriptor()->full_name());
+ sampled_request->meta.set_method_name(
+ test::EchoService::descriptor()->FindMethodByName("Echo")->name());
+ cntl.reset_sampled_request(sampled_request);
+ chan.CallMethod(NULL, &cntl, &serialized_request,
&serialized_response, NULL);
+ ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+
+
ASSERT_TRUE(brpc::ParseFromCompressedData(serialized_response.serialized_data(),
+ &res,
cntl.response_compress_type()))
+ <<
serialized_response.serialized_data().size();
+ ASSERT_EQ(EXP_RESPONSE, res.message());
+ ASSERT_EQ(EXP_RESPONSE, cntl.response_attachment().to_string());
+ }
+
+ ASSERT_EQ(0, server.Stop(0));
+ ASSERT_EQ(0, server.Join());
+}
+
} //namespace
diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp
index e0da1af1..14d150a7 100644
--- a/test/endpoint_unittest.cpp
+++ b/test/endpoint_unittest.cpp
@@ -494,7 +494,7 @@ TEST(EndPointTest, tcp_connect) {
}
{
butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1));
- ASSERT_EQ(-1, sockfd);
+ ASSERT_EQ(-1, sockfd) << "errno=" << errno;
ASSERT_EQ(ETIMEDOUT, errno);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]