This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new c32ddee0 Support custom modification of sub controllers (#3213)
c32ddee0 is described below
commit c32ddee06ef742179b6f93f3b2adf1a8cb160b3e
Author: Bright Chen <[email protected]>
AuthorDate: Sun Mar 1 16:04:06 2026 +0800
Support custom modification of sub controllers (#3213)
* Copy http headers from main controller to sub controller
* Support custom modification of sub controllers
---
docs/cn/combo_channel.md | 16 ++++++++
docs/en/combo_channel.md | 14 +++++++
src/brpc/parallel_channel.cpp | 23 ++++++++++--
src/brpc/parallel_channel.h | 16 +++++++-
src/brpc/policy/redis_protocol.cpp | 2 +-
src/brpc/selective_channel.cpp | 12 +++---
src/brpc/socket.cpp | 3 +-
test/brpc_channel_unittest.cpp | 75 ++++++++++++++++++++++++++++++++++----
test/echo.proto | 1 +
9 files changed, 142 insertions(+), 20 deletions(-)
diff --git a/docs/cn/combo_channel.md b/docs/cn/combo_channel.md
index e11c79b4..fba4f6be 100644
--- a/docs/cn/combo_channel.md
+++ b/docs/cn/combo_channel.md
@@ -60,8 +60,12 @@ public:
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) = 0;
+
+ virtual void MapController(int channel_index/*starting from 0*/, int
channel_count,
+ const Controller* main_cntl, Controller*
sub_cntl);
};
```
+### Map
channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。
@@ -124,6 +128,18 @@ method/request/response:ParallelChannel.CallMethod()的参数。
};
```
+### MapController
+
+channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。
+
+channel_count:ParallelChannel中sub channel的数量。
+
+main_cntl:ParallelChannel.CallMethod()的参数。
+
+sub_cntl:sub
channel的请求对应的controller。默认实现:拷贝main_cntl的http_request和request_attachment到sub_cntl中。
+
+注意:修改ClientSettings相关配置(如超时、重试等)是无效的,因为所有sub_cntl都是使用main_cntl的ClientSettings配置。
+
## ResponseMerger
response_merger把sub
channel的response合并入总的response,其为NULL时,则使用response->MergeFrom(*sub_response),MergeFrom的行为可概括为“除了合并repeated字段,其余都是覆盖”。如果你需要更复杂的行为,则需实现ResponseMerger。response_merger是一个个执行的,所以你并不需要考虑多个Merge同时运行的情况。response_merger在ParallelChannel析构时被删除。response_merger内含引用计数,一个response_merger可与多个sub
channel关联。
diff --git a/docs/en/combo_channel.md b/docs/en/combo_channel.md
index 686fad59..ab68188f 100644
--- a/docs/en/combo_channel.md
+++ b/docs/en/combo_channel.md
@@ -63,6 +63,8 @@ public:
};
```
+### Map
+
`channel_index`: The position of the sub channel inside `ParallelChannel`,
starting from zero.
`channel_count`: The sub channel count inside `ParallelChannel`.
@@ -131,6 +133,18 @@ Common implementations of `Map()` are listed below:
};
```
+### MapController
+
+`channel_index`: The position of the sub channel inside `ParallelChannel`,
starting from zero.
+
+`channel_count`: The sub channel count inside `ParallelChannel`.
+
+`main_cntl`:Parameters to `ParallelChannel::CallMethod()`.
+
+`sub_cntl`:The controller corresponding to the sub-channel's requests. Default
implementation: Copy the http_request and request_attachment of `main_cntl` to
the `sub_cntl`.
+
+Note: Modifying `ClientSettings` configurations (such as timeout and retries)
is ineffective because all sub controllers use the `ClientSettings`
configuration of `main_cntl`.
+
## ResponseMerger
`response_merger` merges responses from all sub channels into one for the
`ParallelChannel`. When it's NULL, `response->MergeFrom(*sub_response)` is used
instead, whose behavior can be summarized as "merge repeated fields and
overwrite the rest". If you need more complex behavior, implement
`ResponseMerger`. Multiple `response_merger` are called one by one to merge sub
responses so that you do not need to consider the race conditions between
merging multiple responses simultaneously. The [...]
diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp
index 130712bf..de2b86f1 100644
--- a/src/brpc/parallel_channel.cpp
+++ b/src/brpc/parallel_channel.cpp
@@ -612,6 +612,7 @@ void ParallelChannel::CallMethod(
int ndone = nchan;
int fail_limit = 1;
int success_limit = 1;
+ Controller::ClientSettings settings{};
DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);
if (cntl->FailedInline()) {
@@ -718,12 +719,28 @@ void ParallelChannel::CallMethod(
d->SaveThreadInfoOfCallsite();
CHECK_EQ(0, bthread_id_unlock(cid));
// Don't touch `cntl' and `d' again (for async RPC)
-
+
+ // Apply client settings of _cntl to controllers of sub calls, except
+ // timeout. If we let sub channel do their timeout separately, when
+ // timeout happens, we get ETOOMANYFAILS rather than ERPCTIMEDOUT.
+ cntl->SaveClientSettings(&settings);
+ settings.timeout_ms = -1;
+ for (int i = 0, j = 0; i < nchan; ++i) {
+ if (!aps[i].is_skip()) {
+ ParallelChannelDone::SubDone* sd = d->sub_done(j++);
+ if (NULL != _chans[i].call_mapper) {
+ _chans[i].call_mapper->MapController(i, nchan, cntl,
&sd->cntl);
+ } else {
+ // Forward the attachment to each sub call.
+
sd->cntl.request_attachment().append(cntl->request_attachment());
+ }
+ sd->cntl.ApplyClientSettings(settings);
+ sd->cntl.allow_done_to_run_in_place();
+ }
+ }
for (int i = 0, j = 0; i < nchan; ++i) {
if (!aps[i].is_skip()) {
ParallelChannelDone::SubDone* sd = d->sub_done(j++);
- // Forward the attachment to each sub call
- sd->cntl.request_attachment().append(cntl->request_attachment());
_chans[i].chan->CallMethod(sd->ap.method, &sd->cntl,
sd->ap.request, sd->ap.response, sd);
}
diff --git a/src/brpc/parallel_channel.h b/src/brpc/parallel_channel.h
index 84e5f342..292213c1 100644
--- a/src/brpc/parallel_channel.h
+++ b/src/brpc/parallel_channel.h
@@ -91,6 +91,14 @@ struct SubCall {
// }
// return SubCall(sub_method, request->sub_request(channel_index),
// response->add_sub_response(), 0);
+// MapController calls to ParallelChannel to sub channels, which can have
+// different controllers.
+// Note:
+// Modifying ClientSettings configurations (such as timeout, retries, etc.)
+// is ineffective because all sub-controllers use the main controller's
+// ClientSettings configuration.
+// Examples:
+// sub_cntl->http_request().SetHeader(...);
class CallMapper : public SharedObject {
public:
virtual SubCall Map(int channel_index/*starting from 0*/,
@@ -98,7 +106,13 @@ public:
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) {
- return Map(channel_index, method, request, response);
+ return Map(channel_index, method, request, response);
+ }
+
+ virtual void MapController(int channel_index/*starting from 0*/, int
channel_count,
+ const Controller* main_cntl, Controller*
sub_cntl) {
+ // Forward the attachment to each sub call by default.
+ sub_cntl->request_attachment().append(main_cntl->request_attachment());
}
protected:
diff --git a/src/brpc/policy/redis_protocol.cpp
b/src/brpc/policy/redis_protocol.cpp
index f8acf49d..9e8e148e 100644
--- a/src/brpc/policy/redis_protocol.cpp
+++ b/src/brpc/policy/redis_protocol.cpp
@@ -283,7 +283,7 @@ void SerializeRedisRequest(butil::IOBuf* buf,
const RedisRequest* rr = (const RedisRequest*)request;
// If redis byte size is zero, brpc call will fail with E22. Continuous
E22 may cause E112 in the end.
// So set failed and return useful error message
- if (rr->ByteSize() == 0) {
+ if (GetProtobufByteSize(*rr) == 0) {
return cntl->SetFailed(EREQUEST, "request byte size is empty");
}
// We work around SerializeTo of pb which is just a placeholder.
diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp
index ec933541..567ffa51 100644
--- a/src/brpc/selective_channel.cpp
+++ b/src/brpc/selective_channel.cpp
@@ -344,13 +344,13 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
sub_cntl->set_request_code(_main_cntl->request_code());
// Forward request attachment to the subcall
sub_cntl->request_attachment().append(_main_cntl->request_attachment());
- sub_cntl->http_request() = _main_cntl->http_request();
+ ProtocolType protocol = _main_cntl->request_protocol();
+ if (PROTOCOL_HTTP == protocol || PROTOCOL_H2 == protocol) {
+ sub_cntl->http_request() = _main_cntl->http_request();
+ }
- sel_out.channel()->CallMethod(_main_cntl->_method,
- &r.sub_done->_cntl,
- _request,
- r.response,
- r.sub_done);
+ sel_out.channel()->CallMethod(_main_cntl->_method, &r.sub_done->_cntl,
+ _request, r.response, r.sub_done);
return 0;
}
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index b132f2ac..c123fb6b 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -896,8 +896,7 @@ void Socket::OnFailed(int error_code, const std::string&
error_text) {
// comes online.
if (HCEnabled()) {
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
- StartHealthCheck(id(),
- GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
+ StartHealthCheck(id(),
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
}
// Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 66d1fbad..86bee891 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -176,6 +176,16 @@ class MyEchoService : public ::test::EchoService {
res->add_code_list(req->code());
}
res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
+
+ brpc::ProtocolType protocol = cntl->request_protocol();
+ if ((brpc::PROTOCOL_HTTP == protocol || brpc::PROTOCOL_H2 == protocol)
&&
+ !req->http_header().empty()) {
+ ASSERT_FALSE(req->http_header().empty());
+ const std::string* val =
cntl->http_request().GetHeader(req->http_header());
+ ASSERT_TRUE(val);
+ ASSERT_FALSE(val->empty());
+ cntl->http_response().SetHeader(req->http_header(), *val);
+ }
}
static void CallAfterRpc(std::shared_ptr<CallAfterRpcObject> str,
brpc::Controller* cntl,
@@ -310,8 +320,10 @@ protected:
bool short_connection,
const brpc::Authenticator* auth = NULL,
std::string connection_group = std::string(),
- bool use_backup_request_policy = false) {
+ bool use_backup_request_policy = false,
+ brpc::ProtocolType protocol = brpc::PROTOCOL_BAIDU_STD) {
brpc::ChannelOptions opt;
+ opt.protocol = protocol;
if (short_connection) {
opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
}
@@ -526,7 +538,7 @@ protected:
int channel_index,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
- google::protobuf::Message* response) {
+ google::protobuf::Message* response) override {
test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
req->set_code(channel_index + 1/*non-zero*/);
return brpc::SubCall(method, req, response->New(),
@@ -540,7 +552,7 @@ protected:
int channel_index,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
- google::protobuf::Message* response) {
+ google::protobuf::Message* response) override {
if (channel_index % 2) {
return brpc::SubCall::Skip();
}
@@ -554,7 +566,7 @@ protected:
int channel_index,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
- google::protobuf::Message* res_base) {
+ google::protobuf::Message* res_base) override {
const test::ComboRequest* req =
dynamic_cast<const test::ComboRequest*>(req_base);
test::ComboResponse* res =
dynamic_cast<test::ComboResponse*>(res_base);
@@ -1334,7 +1346,7 @@ protected:
int /*channel_index*/,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
- google::protobuf::Message* response) {
+ google::protobuf::Message* response) override {
test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
req->set_sleep_us(70000); // 70ms
return brpc::SubCall(method, req, response->New(),
@@ -2357,7 +2369,7 @@ class BadCall : public brpc::CallMapper {
brpc::SubCall Map(int,
const google::protobuf::MethodDescriptor*,
const google::protobuf::Message*,
- google::protobuf::Message*) {
+ google::protobuf::Message*) override {
return brpc::SubCall::Bad();
}
};
@@ -2384,7 +2396,7 @@ class SkipCall : public brpc::CallMapper {
brpc::SubCall Map(int,
const google::protobuf::MethodDescriptor*,
const google::protobuf::Message*,
- google::protobuf::Message*) {
+ google::protobuf::Message*) override {
return brpc::SubCall::Skip();
}
};
@@ -2412,6 +2424,55 @@ TEST_F(ChannelTest, skip_all_channels) {
}
}
+static const std::string ECHO_HTTP_HEADER = "echo-http-header";
+
+class EchoHttpHeader : public brpc::CallMapper {
+public:
+ brpc::SubCall Map(int channel_index, int channel_count,
+ const google::protobuf::MethodDescriptor* method,
+ const google::protobuf::Message* request,
+ google::protobuf::Message* response) override {
+ return brpc::SubCall(method, request, response->New(),
brpc::DELETE_RESPONSE);
+ }
+
+ void MapController(int channel_index, int,
+ const brpc::Controller* main_cntl,
+ brpc::Controller* sub_cntl) override {
+ sub_cntl->http_request().SetHeader(ECHO_HTTP_HEADER,
std::to_string(channel_index));
+ }
+};
+
+TEST_F(ChannelTest, http_header_parallel_channels) {
+ brpc::Server server;
+ MyEchoService service;
+ ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+ brpc::ServerOptions opt;
+ ASSERT_EQ(0, server.Start(_ep, &opt));
+
+ const size_t NCHANS = 5;
+ brpc::ParallelChannel channel;
+ for (size_t i = 0; i < NCHANS; ++i) {
+ brpc::Channel* sub_chan = new brpc::Channel();
+ SetUpChannel(sub_chan, true, false, NULL, "", false,
brpc::PROTOCOL_HTTP);
+ ASSERT_EQ(0, channel.AddChannel(sub_chan, brpc::OWNS_CHANNEL, new
EchoHttpHeader, NULL));
+ }
+
+ brpc::Controller cntl;
+ test::EchoRequest req;
+ test::EchoResponse res;
+ req.set_message(__FUNCTION__);
+ *req.mutable_http_header() = ECHO_HTTP_HEADER;
+ CallMethod(&channel, &cntl, &req, &res, false);
+
+ ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+ ASSERT_EQ((int)NCHANS, cntl.sub_count());
+ for (int i = 0; i < cntl.sub_count(); ++i) {
+ const brpc::Controller* sub_cntl = cntl.sub(i);
+ ASSERT_TRUE(NULL != sub_cntl) << "i=" << i;
+ ASSERT_EQ(std::to_string(i),
*sub_cntl->http_response().GetHeader(ECHO_HTTP_HEADER));
+ }
+}
+
TEST_F(ChannelTest, connection_failed_parallel) {
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
diff --git a/test/echo.proto b/test/echo.proto
index 970ef1db..c9fa8ace 100644
--- a/test/echo.proto
+++ b/test/echo.proto
@@ -27,6 +27,7 @@ message EchoRequest {
optional bool close_fd = 3;
optional int32 sleep_us = 4;
optional int32 server_fail = 5;
+ optional string http_header = 6;
};
message EchoResponse {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]