This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 4c33f886 Support success limit of ParallelChannel (#2842)
4c33f886 is described below
commit 4c33f886756689d6ae90905c5f0cc6fef97e90cf
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jan 6 14:44:41 2025 +0800
Support success limit of ParallelChannel (#2842)
* Support success limit of ParallelChannel
* Update document of ParallelChannel
---
docs/cn/combo_channel.md | 6 ++-
docs/en/combo_channel.md | 6 ++-
src/brpc/parallel_channel.cpp | 68 +++++++++++++++++++++------------
src/brpc/parallel_channel.h | 28 +++++++-------
test/brpc_channel_unittest.cpp | 85 +++++++++++++++++++++++++++++++++++++++++-
5 files changed, 153 insertions(+), 40 deletions(-)
diff --git a/docs/cn/combo_channel.md b/docs/cn/combo_channel.md
index 76c0b23c..e11c79b4 100644
--- a/docs/cn/combo_channel.md
+++ b/docs/cn/combo_channel.md
@@ -19,7 +19,11 @@ ParallelChannel (有时被称为“pchan”)同时访问其包含的sub channel
示例代码见[example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/)。
-任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。
+任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。
+
+用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。
+
+用户可以设置ParallelChannelOptions.success_limit来控制访问的最大成功次数,当成功的访问达到这个数目时,RPC会立刻结束。ParallelChannelOptions.fail_limit的优先级高于ParallelChannelOptions.success_limit,只有未设置fail_limit时,success_limit才会生效。
一个sub channel可多次加入同一个ParallelChannel。当你需要对同一个服务发起多次异步访问并等待它们完成的话,这很有用。
diff --git a/docs/en/combo_channel.md b/docs/en/combo_channel.md
index c87dc5ca..686fad59 100644
--- a/docs/en/combo_channel.md
+++ b/docs/en/combo_channel.md
@@ -19,7 +19,11 @@ We need a better abstraction. If several channels are
combined into a larger one
Check
[example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/)
for an example.
-Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`,
including `ParallelChannel` and other combo channels. Set
`ParallelChannelOptions.fail_limit` to control maximum number of failures. When
number of failed responses reaches the limit, the RPC is ended immediately
rather than waiting for timeout.
+Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`,
including `ParallelChannel` and other combo channels.
+
+Set `ParallelChannelOptions.fail_limit` to control maximum number of failures.
When number of failed responses reaches the limit, the RPC is ended immediately
rather than waiting for timeout.
+
+Set `ParallelChannelOptions.sucess_limit` to control maximum number of
successful responses. When number of successful responses reaches the limit,
the RPC is ended immediately.`ParallelChannelOptions.fail_limit` has a higher
priority than `ParallelChannelOptions.success_limit`. Success_limit will take
effect only when fail_limit is not set.
A sub channel can be added to the same `ParallelChannel` more than once, which
is useful when you need to initiate multiple asynchronous RPC to the same
service and wait for their completions.
diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp
index c6972211..130712bf 100644
--- a/src/brpc/parallel_channel.cpp
+++ b/src/brpc/parallel_channel.cpp
@@ -24,14 +24,8 @@
#include "brpc/details/controller_private_accessor.h"
#include "brpc/parallel_channel.h"
-
namespace brpc {
-ParallelChannelOptions::ParallelChannelOptions()
- : timeout_ms(500)
- , fail_limit(-1) {
-}
-
DECLARE_bool(usercode_in_pthread);
// Not see difference when memory is cached.
@@ -45,12 +39,15 @@ static __thread Memory tls_cached_pchan_mem = { 0, NULL };
class ParallelChannelDone : public google::protobuf::Closure {
private:
- ParallelChannelDone(int fail_limit, int ndone, int nchan, int memsize,
+ ParallelChannelDone(int fail_limit, int success_limit,
+ int ndone, int nchan, int memsize,
Controller* cntl, google::protobuf::Closure* user_done)
: _fail_limit(fail_limit)
+ , _success_limit(success_limit)
, _ndone(ndone)
, _nchan(nchan)
, _memsize(memsize)
+ , _current_success(0)
, _current_fail(0)
, _current_done(0)
, _cntl(cntl)
@@ -59,15 +56,13 @@ private:
, _callmethod_pthread(0) {
}
- ~ParallelChannelDone() { }
-
public:
class SubDone : public google::protobuf::Closure {
public:
SubDone() : shared_data(NULL) {
}
- ~SubDone() {
+ ~SubDone() override {
// Can't delete request/response in ~SubCall because the
// object is copyable.
if (ap.flags & DELETE_REQUEST) {
@@ -78,7 +73,7 @@ public:
}
}
- void Run() {
+ void Run() override {
shared_data->OnSubDoneRun(this);
}
@@ -89,7 +84,8 @@ public:
};
static ParallelChannelDone* Create(
- int fail_limit, int ndone, const SubCall* aps, int nchan,
+ int fail_limit, int success_limit,
+ int ndone, const SubCall* aps, int nchan,
Controller* cntl, google::protobuf::Closure* user_done) {
// We need to create the object in this way because _sub_done is
// dynamically allocated.
@@ -130,8 +126,8 @@ public:
return NULL;
}
#endif
- ParallelChannelDone* d = new (mem) ParallelChannelDone(
- fail_limit, ndone, nchan, memsize, cntl, user_done);
+ auto d = new (mem) ParallelChannelDone(
+ fail_limit, success_limit, ndone, nchan, memsize, cntl, user_done);
// Apply client settings of _cntl to controllers of sub calls, except
// timeout. If we let sub channel do their timeout separately, when
@@ -183,7 +179,7 @@ public:
}
}
- void Run() {
+ void Run() override {
const int ec = _cntl->ErrorCode();
if (ec == EPCHANFINISH) {
// all sub calls finished. Clear the error and we'll set
@@ -220,14 +216,25 @@ public:
if (fin != NULL) {
// [ called from SubDone::Run() ]
- // Count failed sub calls, if fail_limit is reached, cancel others.
- if (fin->cntl.FailedInline() &&
- _current_fail.fetch_add(1, butil::memory_order_relaxed) + 1
- == _fail_limit) {
+ int error_code = fin->cntl.ErrorCode();
+ // EPCHANFINISH is not an error of sub calls.
+ bool fail = 0 != error_code && EPCHANFINISH != error_code;
+ bool cancel =
+ // Count failed sub calls, if `fail_limit' is reached, cancel
others.
+ (fail && _current_fail.fetch_add(1,
butil::memory_order_relaxed) + 1
+ == _fail_limit) ||
+ // Count successful sub calls, if `success_limit' is reached,
cancel others.
+ (0 == error_code &&
+ _current_success.fetch_add(1, butil::memory_order_relaxed) + 1
+ == _success_limit);
+
+ if (cancel) {
+ // Only cancel once by `fail_limit' or `success_limit'.
for (int i = 0; i < _ndone; ++i) {
SubDone* sd = sub_done(i);
if (fin != sd) {
- bthread_id_error(sd->cntl.call_id(), ECANCELED);
+ bthread_id_error(
+ sd->cntl.call_id(), fail ? ECANCELED :
EPCHANFINISH);
}
}
}
@@ -423,6 +430,7 @@ public:
private:
int _fail_limit;
+ int _success_limit;
int _ndone;
int _nchan;
#if defined(__clang__)
@@ -430,6 +438,7 @@ private:
#else
int _memsize;
#endif
+ butil::atomic<int> _current_success;
butil::atomic<int> _current_fail;
butil::atomic<uint32_t> _current_done;
Controller* _cntl;
@@ -602,6 +611,7 @@ void ParallelChannel::CallMethod(
ParallelChannelDone* d = NULL;
int ndone = nchan;
int fail_limit = 1;
+ int success_limit = 1;
DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);
if (cntl->FailedInline()) {
@@ -655,9 +665,21 @@ void ParallelChannel::CallMethod(
fail_limit = ndone;
}
}
-
- d = ParallelChannelDone::Create(fail_limit, ndone, aps, nchan,
- cntl, done);
+
+ // `success_limit' is only valid when `fail_limit' is not set.
+ if (_options.fail_limit >= 0 || _options.success_limit < 0) {
+ success_limit = ndone;
+ } else {
+ success_limit = _options.success_limit;
+ if (success_limit < 1) {
+ success_limit = 1;
+ } else if (success_limit > ndone) {
+ success_limit = ndone;
+ }
+ }
+
+ d = ParallelChannelDone::Create(
+ fail_limit, success_limit, ndone, aps, nchan, cntl, done);
if (NULL == d) {
cntl->SetFailed(ENOMEM, "Fail to new ParallelChannelDone");
goto FAIL;
diff --git a/src/brpc/parallel_channel.h b/src/brpc/parallel_channel.h
index df85c9ac..84e5f342 100644
--- a/src/brpc/parallel_channel.h
+++ b/src/brpc/parallel_channel.h
@@ -112,7 +112,7 @@ protected:
}
// Only callable by subclasses and butil::intrusive_ptr
- virtual ~CallMapper() {}
+ ~CallMapper() override = default;
};
// Clone req_base typed `Req'.
@@ -140,12 +140,11 @@ public:
FAIL_ALL
};
- ResponseMerger() { }
virtual Result Merge(google::protobuf::Message* response,
const google::protobuf::Message* sub_response) = 0;
protected:
// Only callable by subclasses and butil::intrusive_ptr
- virtual ~ResponseMerger() { }
+ ~ResponseMerger() override = default;
};
struct ParallelChannelOptions {
@@ -156,7 +155,7 @@ struct ParallelChannelOptions {
// Overridable by Controller.set_timeout_ms().
// Default: 500 (milliseconds)
// Maximum: 0x7fffffff (roughly 30 days)
- int32_t timeout_ms;
+ int32_t timeout_ms{500};
// The RPC is considered to be successful if number of failed sub RPC
// does not reach this limit. Even if the RPC is timedout or canceled,
@@ -165,10 +164,14 @@ struct ParallelChannelOptions {
// the timeout) when the limit is reached.
// Default: number of sub channels, meaning that the RPC to ParallChannel
// does not fail unless all sub RPC failed.
- int fail_limit;
+ int fail_limit{-1};
- // Construct with default options.
- ParallelChannelOptions();
+ // The RPC is considered to be successful when number of successful sub
+ // RPC reach this limit.
+ // Default: number of sub channels, meaning that the RPC to ParallChannel
+ // does not return unless all sub RPC succeed.
+ // Note: `success_limit' is only valid when `fail_limit' is not set.
+ int success_limit{ -1};
};
// ParallelChannel(aka "pchan") accesses all sub channels simultaneously with
@@ -185,8 +188,7 @@ struct ParallelChannelOptions {
class ParallelChannel : public ChannelBase {
friend class Controller;
public:
- ParallelChannel() { }
- ~ParallelChannel();
+ ~ParallelChannel() override;
// Initialize ParallelChannel with `options'.
// NOTE: Currently this function always returns 0.
@@ -234,7 +236,7 @@ public:
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
- google::protobuf::Closure* done);
+ google::protobuf::Closure* done) override;
// Number of sub channels.
size_t channel_count() const { return _chans.size(); }
@@ -245,10 +247,10 @@ public:
// Minimum weight of sub channels.
// FIXME(gejun): be minimum of top(nchan-fail_limit)
- int Weight();
+ int Weight() override;
// Put description into `os'.
- void Describe(std::ostream& os, const DescribeOptions&) const;
+ void Describe(std::ostream& os, const DescribeOptions&) const override;
public:
struct SubChan {
@@ -263,7 +265,7 @@ public:
protected:
static void* RunDoneAndDestroy(void* arg);
- int CheckHealth();
+ int CheckHealth() override;
ParallelChannelOptions _options;
ChannelList _chans;
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 8814b0bc..7b98896b 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -569,6 +569,24 @@ protected:
}
};
+ class SuccessLimitCallMapper : public brpc::CallMapper {
+ public:
+ brpc::SubCall Map(int channel_index,
+ const google::protobuf::MethodDescriptor* method,
+ const google::protobuf::Message* req_base,
+ google::protobuf::Message* response) override {
+ auto req = brpc::Clone<test::EchoRequest>(req_base);
+ req->set_code(channel_index + 1/*non-zero*/);
+ if (_index++ > 0) {
+ req->set_sleep_us(5 * 1000);
+ }
+ return brpc::SubCall(method, req, response->New(),
+ brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE);
+ }
+ private:
+ size_t _index{0};
+ };
+
class MergeNothing : public brpc::ResponseMerger {
Result Merge(google::protobuf::Message* /*response*/,
const google::protobuf::Message* /*sub_response*/) {
@@ -826,7 +844,60 @@ protected:
}
StopAndJoin();
}
-
+
+ void TestSuccessLimitParallel(bool single_server, bool async, bool
short_connection) {
+ std::cout << " *** single=" << single_server
+ << " async=" << async
+ << " short=" << short_connection << std::endl;
+
+ ASSERT_EQ(0, StartAccept(_ep));
+ const size_t NCHANS = 8;
+ brpc::Channel subchans[NCHANS];
+ brpc::ParallelChannel channel;
+ brpc::ParallelChannelOptions options;
+ // Only care about the first successful response.
+ options.success_limit = 1;
+ channel.Init(&options);
+ butil::intrusive_ptr<brpc::CallMapper> fast_call_mapper(new
SuccessLimitCallMapper);
+ for (size_t i = 0; i < NCHANS; ++i) {
+ SetUpChannel(&subchans[i], single_server, short_connection);
+ ASSERT_EQ(0, channel.AddChannel(
+ &subchans[i], brpc::DOESNT_OWN_CHANNEL, fast_call_mapper,
NULL));
+ }
+ brpc::Controller cntl;
+ test::EchoRequest req;
+ test::EchoResponse res;
+ req.set_message(__FUNCTION__);
+ req.set_code(23);
+ CallMethod(&channel, &cntl, &req, &res, async);
+
+ EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
+ EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
+ for (int i = 0; i < cntl.sub_count(); ++i) {
+ EXPECT_TRUE(cntl.sub(i)) << "i=" << i;
+ if (0 == i) {
+ EXPECT_TRUE(!cntl.sub(i)->Failed()) << "i=" << i;
+ } else {
+ EXPECT_TRUE(cntl.sub(i)->Failed()) << "i=" << i;
+ EXPECT_EQ(brpc::EPCHANFINISH, cntl.sub(i)->ErrorCode()) <<
"i=" << i;
+ }
+ }
+ EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
+ ASSERT_EQ(1, res.code_list_size());
+ ASSERT_EQ((int)1, res.code_list(0));
+ if (short_connection) {
+ // Sleep to let `_messenger' detect `Socket' being `SetFailed'
+ const int64_t start_time = butil::gettimeofday_us();
+ while (_messenger.ConnectionCount() != 0) {
+ EXPECT_LT(butil::gettimeofday_us(), start_time +
100000L/*100ms*/);
+ bthread_usleep(1000);
+ }
+ } else {
+ EXPECT_GE(1ul, _messenger.ConnectionCount());
+ }
+ StopAndJoin();
+ }
+
struct CancelerArg {
int64_t sleep_before_cancel_us;
brpc::CallId cid;
@@ -2382,7 +2453,7 @@ TEST_F(ChannelTest, success_parallel) {
}
TEST_F(ChannelTest, success_duplicated_parallel) {
- for (int i = 0; i <= 1; ++i) { // Flag SingleServer
+ for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
for (int k = 0; k <=1; ++k) { // Flag ShortConnection
TestSuccessDuplicatedParallel(i, j, k);
@@ -2421,6 +2492,16 @@ TEST_F(ChannelTest, success_parallel2) {
}
}
+TEST_F(ChannelTest, success_limit_parallel) {
+ for (int i = 0; i <= 1; ++i) { // Flag SingleServer
+ for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
+ for (int k = 0; k <=1; ++k) { // Flag ShortConnection
+ TestSuccessLimitParallel(i, j, k);
+ }
+ }
+ }
+}
+
TEST_F(ChannelTest, cancel_before_callmethod) {
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]