This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c33f886 Support success limit of ParallelChannel (#2842)
4c33f886 is described below

commit 4c33f886756689d6ae90905c5f0cc6fef97e90cf
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jan 6 14:44:41 2025 +0800

    Support success limit of ParallelChannel (#2842)
    
    * Support success limit of ParallelChannel
    
    * Update document of ParallelChannel
---
 docs/cn/combo_channel.md       |  6 ++-
 docs/en/combo_channel.md       |  6 ++-
 src/brpc/parallel_channel.cpp  | 68 +++++++++++++++++++++------------
 src/brpc/parallel_channel.h    | 28 +++++++-------
 test/brpc_channel_unittest.cpp | 85 +++++++++++++++++++++++++++++++++++++++++-
 5 files changed, 153 insertions(+), 40 deletions(-)

diff --git a/docs/cn/combo_channel.md b/docs/cn/combo_channel.md
index 76c0b23c..e11c79b4 100644
--- a/docs/cn/combo_channel.md
+++ b/docs/cn/combo_channel.md
@@ -19,7 +19,11 @@ ParallelChannel (有时被称为“pchan”)同时访问其包含的sub channel
 
 
示例代码见[example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/)。
 
-任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。
+任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。
+
+用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。
+
+用户可以设置ParallelChannelOptions.success_limit来控制访问的最大成功次数,当成功的访问达到这个数目时,RPC会立刻结束。ParallelChannelOptions.fail_limit的优先级高于ParallelChannelOptions.success_limit,只有未设置fail_limit时,success_limit才会生效。
 
 一个sub channel可多次加入同一个ParallelChannel。当你需要对同一个服务发起多次异步访问并等待它们完成的话,这很有用。
 
diff --git a/docs/en/combo_channel.md b/docs/en/combo_channel.md
index c87dc5ca..686fad59 100644
--- a/docs/en/combo_channel.md
+++ b/docs/en/combo_channel.md
@@ -19,7 +19,11 @@ We need a better abstraction. If several channels are 
combined into a larger one
 
 Check 
[example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/)
 for an example.
 
-Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, 
including `ParallelChannel` and other combo channels. Set 
`ParallelChannelOptions.fail_limit` to control maximum number of failures. When 
number of failed responses reaches the limit, the RPC is ended immediately 
rather than waiting for timeout.
+Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, 
including `ParallelChannel` and other combo channels. 
+
+Set `ParallelChannelOptions.fail_limit` to control maximum number of failures. 
When number of failed responses reaches the limit, the RPC is ended immediately 
rather than waiting for timeout.
+
+Set `ParallelChannelOptions.sucess_limit` to control maximum number of 
successful responses. When number of successful responses reaches the limit, 
the RPC is ended immediately.`ParallelChannelOptions.fail_limit` has a higher 
priority than `ParallelChannelOptions.success_limit`. Success_limit will take 
effect only when fail_limit is not set.
 
 A sub channel can be added to the same `ParallelChannel` more than once, which 
is useful when you need to initiate multiple asynchronous RPC to the same 
service and wait for their completions.
 
diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp
index c6972211..130712bf 100644
--- a/src/brpc/parallel_channel.cpp
+++ b/src/brpc/parallel_channel.cpp
@@ -24,14 +24,8 @@
 #include "brpc/details/controller_private_accessor.h"
 #include "brpc/parallel_channel.h"
 
-
 namespace brpc {
 
-ParallelChannelOptions::ParallelChannelOptions()
-    : timeout_ms(500)
-    , fail_limit(-1) {
-}
-
 DECLARE_bool(usercode_in_pthread);
 
 // Not see difference when memory is cached.
@@ -45,12 +39,15 @@ static __thread Memory tls_cached_pchan_mem = { 0, NULL };
 
 class ParallelChannelDone : public google::protobuf::Closure {
 private:
-    ParallelChannelDone(int fail_limit, int ndone, int nchan, int memsize,
+    ParallelChannelDone(int fail_limit, int success_limit,
+                        int ndone, int nchan, int memsize,
                         Controller* cntl, google::protobuf::Closure* user_done)
         : _fail_limit(fail_limit)
+        , _success_limit(success_limit)
         , _ndone(ndone)
         , _nchan(nchan)
         , _memsize(memsize)
+        , _current_success(0)
         , _current_fail(0)
         , _current_done(0)
         , _cntl(cntl)
@@ -59,15 +56,13 @@ private:
         , _callmethod_pthread(0) {
     }
 
-    ~ParallelChannelDone() { }
-
 public:
     class SubDone : public google::protobuf::Closure {
     public:
         SubDone() : shared_data(NULL) {
         }
 
-        ~SubDone() {
+        ~SubDone() override {
             // Can't delete request/response in ~SubCall because the
             // object is copyable.
             if (ap.flags & DELETE_REQUEST) {
@@ -78,7 +73,7 @@ public:
             }
         }
  
-        void Run() {
+        void Run() override {
             shared_data->OnSubDoneRun(this);
         }
 
@@ -89,7 +84,8 @@ public:
     };
     
     static ParallelChannelDone* Create(
-        int fail_limit, int ndone, const SubCall* aps, int nchan,
+        int fail_limit, int success_limit,
+        int ndone, const SubCall* aps, int nchan,
         Controller* cntl, google::protobuf::Closure* user_done) {
         // We need to create the object in this way because _sub_done is
         // dynamically allocated.
@@ -130,8 +126,8 @@ public:
             return NULL;
         }
 #endif
-        ParallelChannelDone* d = new (mem) ParallelChannelDone(
-            fail_limit, ndone, nchan, memsize, cntl, user_done);
+        auto d = new (mem) ParallelChannelDone(
+            fail_limit, success_limit, ndone, nchan, memsize, cntl, user_done);
 
         // Apply client settings of _cntl to controllers of sub calls, except
         // timeout. If we let sub channel do their timeout separately, when
@@ -183,7 +179,7 @@ public:
         }
     }
 
-    void Run() {
+    void Run() override {
         const int ec = _cntl->ErrorCode();
         if (ec == EPCHANFINISH) {
             // all sub calls finished. Clear the error and we'll set
@@ -220,14 +216,25 @@ public:
         if (fin != NULL) {
             // [ called from SubDone::Run() ]
 
-            // Count failed sub calls, if fail_limit is reached, cancel others.
-            if (fin->cntl.FailedInline() &&
-                _current_fail.fetch_add(1, butil::memory_order_relaxed) + 1
-                == _fail_limit) {
+            int error_code = fin->cntl.ErrorCode();
+            // EPCHANFINISH is not an error of sub calls.
+            bool fail = 0 != error_code && EPCHANFINISH != error_code;
+            bool cancel =
+                // Count failed sub calls, if `fail_limit' is reached, cancel 
others.
+                (fail && _current_fail.fetch_add(1, 
butil::memory_order_relaxed) + 1
+                         == _fail_limit) ||
+                // Count successful sub calls, if `success_limit' is reached, 
cancel others.
+                (0 == error_code &&
+                 _current_success.fetch_add(1, butil::memory_order_relaxed) + 1
+                 == _success_limit);
+
+            if (cancel) {
+                // Only cancel once by `fail_limit' or `success_limit'.
                 for (int i = 0; i < _ndone; ++i) {
                     SubDone* sd = sub_done(i);
                     if (fin != sd) {
-                        bthread_id_error(sd->cntl.call_id(), ECANCELED);
+                        bthread_id_error(
+                            sd->cntl.call_id(), fail ? ECANCELED : 
EPCHANFINISH);
                     }
                 }
             }
@@ -423,6 +430,7 @@ public:
 
 private:
     int _fail_limit;
+    int _success_limit;
     int _ndone;
     int _nchan;
 #if defined(__clang__)
@@ -430,6 +438,7 @@ private:
 #else
     int _memsize;
 #endif
+    butil::atomic<int> _current_success;
     butil::atomic<int> _current_fail;
     butil::atomic<uint32_t> _current_done;
     Controller* _cntl;
@@ -602,6 +611,7 @@ void ParallelChannel::CallMethod(
     ParallelChannelDone* d = NULL;
     int ndone = nchan;
     int fail_limit = 1;
+    int success_limit = 1;
     DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);
 
     if (cntl->FailedInline()) {
@@ -655,9 +665,21 @@ void ParallelChannel::CallMethod(
             fail_limit = ndone;
         }
     }
-    
-    d = ParallelChannelDone::Create(fail_limit, ndone, aps, nchan,
-                                    cntl, done);
+
+    // `success_limit' is only valid when `fail_limit' is not set.
+    if (_options.fail_limit >= 0 || _options.success_limit < 0) {
+        success_limit = ndone;
+    } else {
+        success_limit = _options.success_limit;
+        if (success_limit < 1) {
+            success_limit = 1;
+        } else if (success_limit > ndone) {
+            success_limit = ndone;
+        }
+    }
+
+    d = ParallelChannelDone::Create(
+        fail_limit, success_limit, ndone, aps, nchan, cntl, done);
     if (NULL == d) {
         cntl->SetFailed(ENOMEM, "Fail to new ParallelChannelDone");
         goto FAIL;
diff --git a/src/brpc/parallel_channel.h b/src/brpc/parallel_channel.h
index df85c9ac..84e5f342 100644
--- a/src/brpc/parallel_channel.h
+++ b/src/brpc/parallel_channel.h
@@ -112,7 +112,7 @@ protected:
     }
     
     // Only callable by subclasses and butil::intrusive_ptr
-    virtual ~CallMapper() {}
+    ~CallMapper() override = default;
 };
 
 // Clone req_base typed `Req'.
@@ -140,12 +140,11 @@ public:
         FAIL_ALL
     };
 
-    ResponseMerger() { }
     virtual Result Merge(google::protobuf::Message* response,
                          const google::protobuf::Message* sub_response) = 0;
 protected:
     // Only callable by subclasses and butil::intrusive_ptr
-    virtual ~ResponseMerger() { }
+    ~ResponseMerger() override = default;
 };
 
 struct ParallelChannelOptions {
@@ -156,7 +155,7 @@ struct ParallelChannelOptions {
     // Overridable by Controller.set_timeout_ms().
     // Default: 500 (milliseconds)
     // Maximum: 0x7fffffff (roughly 30 days)
-    int32_t timeout_ms;
+    int32_t timeout_ms{500};
 
     // The RPC is considered to be successful if number of failed sub RPC
     // does not reach this limit. Even if the RPC is timedout or canceled,
@@ -165,10 +164,14 @@ struct ParallelChannelOptions {
     // the timeout) when the limit is reached.
     // Default: number of sub channels, meaning that the RPC to ParallChannel
     // does not fail unless all sub RPC failed.
-    int fail_limit;
+    int fail_limit{-1};
 
-    // Construct with default options.
-    ParallelChannelOptions();
+    // The RPC is considered to be successful when number of successful sub
+    // RPC reach this limit.
+    // Default: number of sub channels, meaning that the RPC to ParallChannel
+    // does not return unless all sub RPC succeed.
+    // Note: `success_limit' is only valid when `fail_limit' is not set.
+    int success_limit{ -1};
 };
 
 // ParallelChannel(aka "pchan") accesses all sub channels simultaneously with
@@ -185,8 +188,7 @@ struct ParallelChannelOptions {
 class ParallelChannel : public ChannelBase {
 friend class Controller;
 public:
-    ParallelChannel() { }
-    ~ParallelChannel();
+    ~ParallelChannel() override;
 
     // Initialize ParallelChannel with `options'.
     // NOTE: Currently this function always returns 0.
@@ -234,7 +236,7 @@ public:
                     google::protobuf::RpcController* controller,
                     const google::protobuf::Message* request,
                     google::protobuf::Message* response,
-                    google::protobuf::Closure* done);
+                    google::protobuf::Closure* done) override;
 
     // Number of sub channels.
     size_t channel_count() const { return _chans.size(); }
@@ -245,10 +247,10 @@ public:
 
     // Minimum weight of sub channels.
     // FIXME(gejun): be minimum of top(nchan-fail_limit)
-    int Weight();
+    int Weight() override;
 
     // Put description into `os'.
-    void Describe(std::ostream& os, const DescribeOptions&) const;
+    void Describe(std::ostream& os, const DescribeOptions&) const override;
 
 public:
     struct SubChan {
@@ -263,7 +265,7 @@ public:
 
 protected:
     static void* RunDoneAndDestroy(void* arg);
-    int CheckHealth();
+    int CheckHealth() override;
 
     ParallelChannelOptions _options;
     ChannelList _chans;
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 8814b0bc..7b98896b 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -569,6 +569,24 @@ protected:
         }
     };
 
+    class SuccessLimitCallMapper : public brpc::CallMapper {
+    public:
+        brpc::SubCall Map(int channel_index,
+                          const google::protobuf::MethodDescriptor* method,
+                          const google::protobuf::Message* req_base,
+                          google::protobuf::Message* response) override {
+            auto req = brpc::Clone<test::EchoRequest>(req_base);
+            req->set_code(channel_index + 1/*non-zero*/);
+            if (_index++ > 0) {
+                req->set_sleep_us(5 * 1000);
+            }
+            return brpc::SubCall(method, req, response->New(),
+                                 brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE);
+        }
+    private:
+        size_t _index{0};
+    };
+
     class MergeNothing : public brpc::ResponseMerger {
         Result Merge(google::protobuf::Message* /*response*/,
                      const google::protobuf::Message* /*sub_response*/) {
@@ -826,7 +844,60 @@ protected:
         }
         StopAndJoin();
     }
-    
+
+    void TestSuccessLimitParallel(bool single_server, bool async, bool 
short_connection) {
+        std::cout << " *** single=" << single_server
+                  << " async=" << async
+                  << " short=" << short_connection << std::endl;
+
+        ASSERT_EQ(0, StartAccept(_ep));
+        const size_t NCHANS = 8;
+        brpc::Channel subchans[NCHANS];
+        brpc::ParallelChannel channel;
+        brpc::ParallelChannelOptions options;
+        // Only care about the first successful response.
+        options.success_limit = 1;
+        channel.Init(&options);
+        butil::intrusive_ptr<brpc::CallMapper> fast_call_mapper(new 
SuccessLimitCallMapper);
+        for (size_t i = 0; i < NCHANS; ++i) {
+            SetUpChannel(&subchans[i], single_server, short_connection);
+            ASSERT_EQ(0, channel.AddChannel(
+                &subchans[i], brpc::DOESNT_OWN_CHANNEL, fast_call_mapper, 
NULL));
+        }
+        brpc::Controller cntl;
+        test::EchoRequest req;
+        test::EchoResponse res;
+        req.set_message(__FUNCTION__);
+        req.set_code(23);
+        CallMethod(&channel, &cntl, &req, &res, async);
+
+        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
+        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
+        for (int i = 0; i < cntl.sub_count(); ++i) {
+            EXPECT_TRUE(cntl.sub(i)) << "i=" << i;
+            if (0 == i) {
+                EXPECT_TRUE(!cntl.sub(i)->Failed()) << "i=" << i;
+            } else {
+                EXPECT_TRUE(cntl.sub(i)->Failed()) << "i=" << i;
+                EXPECT_EQ(brpc::EPCHANFINISH, cntl.sub(i)->ErrorCode()) << 
"i=" << i;
+            }
+        }
+        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
+        ASSERT_EQ(1, res.code_list_size());
+        ASSERT_EQ((int)1, res.code_list(0));
+        if (short_connection) {
+            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
+            const int64_t start_time = butil::gettimeofday_us();
+            while (_messenger.ConnectionCount() != 0) {
+                EXPECT_LT(butil::gettimeofday_us(), start_time + 
100000L/*100ms*/);
+                bthread_usleep(1000);
+            }
+        } else {
+            EXPECT_GE(1ul, _messenger.ConnectionCount());
+        }
+        StopAndJoin();
+    }
+
     struct CancelerArg {
         int64_t sleep_before_cancel_us;
         brpc::CallId cid;
@@ -2382,7 +2453,7 @@ TEST_F(ChannelTest, success_parallel) {
 }
 
 TEST_F(ChannelTest, success_duplicated_parallel) {
-    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
+    for (int i = 0; i <= 1; ++i) { // Flag SingleServer
         for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
             for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                 TestSuccessDuplicatedParallel(i, j, k);
@@ -2421,6 +2492,16 @@ TEST_F(ChannelTest, success_parallel2) {
     }
 }
 
+TEST_F(ChannelTest, success_limit_parallel) {
+    for (int i = 0; i <= 1; ++i) { // Flag SingleServer
+        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
+            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
+                TestSuccessLimitParallel(i, j, k);
+            }
+        }
+    }
+}
+
 TEST_F(ChannelTest, cancel_before_callmethod) {
     for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
         for (int j = 0; j <= 1; ++j) { // Flag Asynchronous


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to