This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new bb284cf1 Support backup request policy (#2734)
bb284cf1 is described below
commit bb284cf1a2b1e1de7517a748be7240b272b131bc
Author: Bright Chen <[email protected]>
AuthorDate: Thu Sep 26 10:46:01 2024 +0800
Support backup request policy (#2734)
* Support backup request policy
* Support Controller::set_backup_request_policy
* Pass Controller to GetBackupRequestMs and update cn/client.md
* Feedback call info
* Avoid to block the timer thread in HandleSocketFailed
---
docs/cn/client.md | 33 ++++++++--
src/brpc/backup_request_policy.h | 43 +++++++++++++
src/brpc/channel.cpp | 6 +-
src/brpc/channel.h | 14 ++++-
src/brpc/controller.cpp | 28 +++++++++
src/brpc/controller.h | 15 +++--
test/brpc_channel_unittest.cpp | 128 ++++++++++++++++++++++++++++++++++++++-
7 files changed, 254 insertions(+), 13 deletions(-)
diff --git a/docs/cn/client.md b/docs/cn/client.md
index 27f1fa70..0cf3dc75 100755
--- a/docs/cn/client.md
+++ b/docs/cn/client.md
@@ -584,10 +584,6 @@
r34717后Controller.has_backup_request()获知是否发送过backup_request。
如果server一直没有返回,但连接没有问题,这种情况下不会重试。如果你需要在一定时间后发送另一个请求,使用backup request。
-工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup
request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。
-
-ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启),Controller.set_backup_request_ms()可修改某次RPC的值。
-
### 没到超时
超时后RPC会尽快结束。
@@ -708,6 +704,35 @@ options.retry_policy = &g_my_retry_policy;
-
[brpc::RpcRetryPolicyWithFixedBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(固定时间间隔退策略)和[brpc::RpcRetryPolicyWithJitteredBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(随机时间间隔退策略)继承了[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h),使用框架默认的DoRetry。
- 在pthread中进行重试退避(实际上通过bthread_usleep实现)会阻塞pthread,所以默认不会在pthread上进行重试退避。
+### backup request
+
+工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup
request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。
+
+ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启)。Controller.set_backup_request_ms()可修改某次RPC的值。
+
+用户可以通过继承[brpc::BackupRequestPolicy](https://github.com/apache/brpc/blob/master/src/brpc/backup_request_policy.h)自定义策略(backup_request_ms和熔断backup
request)。 比如根据延时调节backup_request_ms或者根据错误率熔断部分backup request。
+
+ChannelOptions.backup_request_policy同样影响该Channel上所有RPC。Controller.set_backup_request_policy()可修改某次RPC的策略。backup_request_policy优先级高于backup_request_ms。
+
+brpc::BackupRequestPolicy接口如下:
+
+```c++
+class BackupRequestPolicy {
+public:
+ virtual ~BackupRequestPolicy() = default;
+
+ // Return the time in milliseconds in which another request
+ // will be sent if RPC does not finish.
+ virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0;
+
+ // Return true if the backup request should be sent.
+ virtual bool DoBackup(const Controller* controller) const = 0;
+
+ // Called when a rpc is end, user can collect call information to adjust
policy.
+ virtual void OnRPCEnd(const Controller* controller) = 0;
+};
+```
+
### 重试应当保守
由于成本的限制,大部分线上server的冗余度是有限的,主要是满足多机房互备的需求。而激进的重试逻辑很容易导致众多client对server集群造成2-3倍的压力,最终使集群雪崩:由于server来不及处理导致队列越积越长,使所有的请求得经过很长的排队才被处理而最终超时,相当于服务停摆。默认的重试是比较安全的:
只要连接不断RPC就不会重试,一般不会产生大量的重试请求。用户可以通过RetryPolicy定制重试策略,但也可能使重试变成一场“风暴”。当你定制RetryPolicy时,你需要仔细考虑client和server的协作关系,并设计对应的异常测试,以确保行为符合预期。
diff --git a/src/brpc/backup_request_policy.h b/src/brpc/backup_request_policy.h
new file mode 100644
index 00000000..ea254f1d
--- /dev/null
+++ b/src/brpc/backup_request_policy.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef BRPC_BACKUP_REQUEST_POLICY_H
+#define BRPC_BACKUP_REQUEST_POLICY_H
+
+#include "brpc/controller.h"
+
+namespace brpc {
+
+class BackupRequestPolicy {
+public:
+ virtual ~BackupRequestPolicy() = default;
+
+ // Return the time in milliseconds in which another request
+ // will be sent if RPC does not finish.
+ virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0;
+
+ // Return true if the backup request should be sent.
+ virtual bool DoBackup(const Controller* controller) const = 0;
+
+ // Called when a rpc is end, user can collect call information to adjust
policy.
+ virtual void OnRPCEnd(const Controller* controller) = 0;
+};
+
+}
+
+#endif // BRPC_BACKUP_REQUEST_POLICY_H
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index 5fc66096..c15611ea 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -55,6 +55,7 @@ ChannelOptions::ChannelOptions()
, log_succeed_without_server(true)
, use_rdma(false)
, auth(NULL)
+ , backup_request_policy(NULL)
, retry_policy(NULL)
, ns_filter(NULL)
{}
@@ -495,8 +496,10 @@ void Channel::CallMethod(const
google::protobuf::MethodDescriptor* method,
// overriding connect_timeout_ms does not make sense, just use the
// one in ChannelOptions
cntl->_connect_timeout_ms = _options.connect_timeout_ms;
- if (cntl->backup_request_ms() == UNSET_MAGIC_NUM) {
+ if (cntl->backup_request_ms() == UNSET_MAGIC_NUM &&
+ NULL == cntl->_backup_request_policy) {
cntl->set_backup_request_ms(_options.backup_request_ms);
+ cntl->_backup_request_policy = _options.backup_request_policy;
}
if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) {
cntl->set_connection_type(_options.connection_type);
@@ -536,6 +539,7 @@ void Channel::CallMethod(const
google::protobuf::MethodDescriptor* method,
// Currently we cannot handle retry and backup request correctly
cntl->set_max_retry(0);
cntl->set_backup_request_ms(-1);
+ cntl->_backup_request_policy = NULL;
}
if (cntl->backup_request_ms() >= 0 &&
diff --git a/src/brpc/channel.h b/src/brpc/channel.h
index 2ed91a64..ef3c15e7 100644
--- a/src/brpc/channel.h
+++ b/src/brpc/channel.h
@@ -34,6 +34,7 @@
#include "brpc/controller.h" // brpc::Controller
#include "brpc/details/profiler_linker.h"
#include "brpc/retry_policy.h"
+#include "brpc/backup_request_policy.h"
#include "brpc/naming_service_filter.h"
namespace brpc {
@@ -55,11 +56,12 @@ struct ChannelOptions {
int32_t timeout_ms;
// Send another request if RPC does not finish after so many milliseconds.
- // Overridable by Controller.set_backup_request_ms().
+ // Overridable by Controller.set_backup_request_ms() or
+ // Controller.set_backup_request_policy().
// The request will be sent to a different server by best effort.
// If timeout_ms is set and backup_request_ms >= timeout_ms, backup request
// will never be sent.
- // backup request does NOT imply server-side cancelation.
+ // backup request does NOT imply server-side cancellation.
// Default: -1 (disabled)
// Maximum: 0x7fffffff (roughly 30 days)
int32_t backup_request_ms;
@@ -112,6 +114,14 @@ struct ChannelOptions {
// Default: NULL
const Authenticator* auth;
+ // Customize the backup request time and whether to send backup request.
+ // Priority: `backup_request_policy' > `backup_request_ms'.
+ // Overridable by Controller.set_backup_request_ms() or
+ // Controller.set_backup_request_policy().
+ // This object is NOT owned by channel and should remain valid when
channel is used.
+ // Default: NULL
+ BackupRequestPolicy* backup_request_policy;
+
// Customize the error code that should be retried. The interface is
// defined in src/brpc/retry_policy.h
// This object is NOT owned by channel and should remain valid when
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 98e25ae2..afebb3c2 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -258,6 +258,7 @@ void Controller::ResetPods() {
_connection_type = CONNECTION_TYPE_UNKNOWN;
_timeout_ms = UNSET_MAGIC_NUM;
_backup_request_ms = UNSET_MAGIC_NUM;
+ _backup_request_policy = NULL;
_connect_timeout_ms = UNSET_MAGIC_NUM;
_real_timeout_ms = UNSET_MAGIC_NUM;
_deadline_us = -1;
@@ -344,6 +345,16 @@ void Controller::set_backup_request_ms(int64_t timeout_ms)
{
}
}
+int64_t Controller::backup_request_ms() const {
+ int timeout_ms = NULL != _backup_request_policy ?
+ _backup_request_policy->GetBackupRequestMs(this) : _backup_request_ms;
+ if (timeout_ms > 0x7fffffff) {
+ timeout_ms = 0x7fffffff;
+ LOG(WARNING) << "backup_request_ms is limited to 0x7fffffff (roughly
24 days)";
+ }
+ return timeout_ms;
+}
+
void Controller::set_max_retry(int max_retry) {
if (max_retry > MAX_RETRY_COUNT) {
LOG(WARNING) << "Retry count can't be larger than "
@@ -606,6 +617,13 @@ void Controller::OnVersionedRPCReturned(const
CompletionInfo& info,
goto END_OF_RPC;
}
if (_error_code == EBACKUPREQUEST) {
+ if (NULL != _backup_request_policy &&
!_backup_request_policy->DoBackup(this)) {
+ // No need to do backup request.
+ _error_code = saved_error;
+ CHECK_EQ(0, bthread_id_unlock(info.id));
+ return;
+ }
+
// Reset timeout if needed
int rc = 0;
if (timeout_ms() >= 0) {
@@ -969,6 +987,14 @@ void Controller::EndRPC(const CompletionInfo& info) {
CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid));
}
}
+
+void Controller::OnRPCEnd(int64_t end_time_us) {
+ _end_time_us = end_time_us;
+ if (NULL != _backup_request_policy) {
+ _backup_request_policy->OnRPCEnd(this);
+ }
+}
+
void Controller::RunDoneInBackupThread(void* arg) {
static_cast<Controller*>(arg)->DoneInBackupThread();
}
@@ -1313,6 +1339,7 @@ CallId Controller::call_id() {
void Controller::SaveClientSettings(ClientSettings* s) const {
s->timeout_ms = _timeout_ms;
s->backup_request_ms = _backup_request_ms;
+ s->backup_request_policy = _backup_request_policy;
s->max_retry = _max_retry;
s->tos = _tos;
s->connection_type = _connection_type;
@@ -1325,6 +1352,7 @@ void Controller::SaveClientSettings(ClientSettings* s)
const {
void Controller::ApplyClientSettings(const ClientSettings& s) {
set_timeout_ms(s.timeout_ms);
set_backup_request_ms(s.backup_request_ms);
+ set_backup_request_policy(s.backup_request_policy);
set_max_retry(s.max_retry);
set_type_of_service(s.tos);
set_connection_type(s.connection_type);
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 5b2132b4..9b3c0201 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -71,6 +71,7 @@ class RPCSender;
class StreamSettings;
class MongoContext;
class RetryPolicy;
+class BackupRequestPolicy;
class InputMessageBase;
class ThriftStub;
namespace policy {
@@ -180,7 +181,10 @@ public:
// Set/get the delay to send backup request in milliseconds. Use
// ChannelOptions.backup_request_ms on unset.
void set_backup_request_ms(int64_t timeout_ms);
- int64_t backup_request_ms() const { return _backup_request_ms; }
+ void set_backup_request_policy(BackupRequestPolicy* policy) {
+ _backup_request_policy = policy;
+ }
+ int64_t backup_request_ms() const;
// Set/get maximum times of retrying. Use ChannelOptions.max_retry on
unset.
// <=0 means no retry.
@@ -670,7 +674,8 @@ private:
struct ClientSettings {
int32_t timeout_ms;
int32_t backup_request_ms;
- int max_retry;
+ BackupRequestPolicy* backup_request_policy;
+ int max_retry;
int32_t tos;
ConnectionType connection_type;
CompressType request_compress_type;
@@ -737,9 +742,7 @@ private:
_end_time_us = begin_time_us;
}
- void OnRPCEnd(int64_t end_time_us) {
- _end_time_us = end_time_us;
- }
+ void OnRPCEnd(int64_t end_time_us);
static void RunDoneInBackupThread(void*);
void DoneInBackupThread();
@@ -800,6 +803,8 @@ private:
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
+ // Priority: `_backup_request_policy' > `_backup_request_ms'.
+ BackupRequestPolicy* _backup_request_policy;
// If this rpc call has retry/backup request,this var save the real
timeout for current call
int64_t _real_timeout_ms;
// Deadline of this RPC (since the Epoch in microseconds).
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 6c189f18..8814b0bc 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -309,7 +309,8 @@ protected:
bool single_server,
bool short_connection,
const brpc::Authenticator* auth = NULL,
- std::string connection_group = std::string()) {
+ std::string connection_group = std::string(),
+ bool use_backup_request_policy = false) {
brpc::ChannelOptions opt;
if (short_connection) {
opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
@@ -317,6 +318,9 @@ protected:
opt.auth = auth;
opt.max_retry = 0;
opt.connection_group = connection_group;
+ if (use_backup_request_policy) {
+ opt.backup_request_policy = &_backup_request_policy;
+ }
if (single_server) {
EXPECT_EQ(0, channel->Init(_ep, &opt));
} else {
@@ -1917,6 +1921,107 @@ protected:
StopAndJoin();
}
+ void TestBackupRequest(bool single_server, bool async,
+ bool short_connection) {
+ std::cout << " *** single=" << single_server
+ << " async=" << async
+ << " short=" << short_connection << std::endl;
+
+ ASSERT_EQ(0, StartAccept(_ep));
+ brpc::Channel channel;
+ SetUpChannel(&channel, single_server, short_connection);
+
+ const int RETRY_NUM = 1;
+ test::EchoRequest req;
+ test::EchoResponse res;
+ brpc::Controller cntl;
+ req.set_message(__FUNCTION__);
+
+ cntl.set_max_retry(RETRY_NUM);
+ cntl.set_backup_request_ms(10); // 10ms
+ cntl.set_timeout_ms(100); // 10ms
+ req.set_sleep_us(50000); // 100ms
+ CallMethod(&channel, &cntl, &req, &res, async);
+ ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
+ ASSERT_TRUE(cntl.has_backup_request());
+ ASSERT_EQ(RETRY_NUM, cntl.retried_count());
+ bthread_usleep(70000); // wait for the sleep task to finish
+
+ if (short_connection) {
+ // Sleep to let `_messenger' detect `Socket' being `SetFailed'
+ const int64_t start_time = butil::gettimeofday_us();
+ while (_messenger.ConnectionCount() != 0) {
+ EXPECT_LT(butil::gettimeofday_us(), start_time +
100000L/*100ms*/);
+ bthread_usleep(1000);
+ }
+ } else {
+ EXPECT_GE(1ul, _messenger.ConnectionCount());
+ }
+ StopAndJoin();
+ }
+
+ class BackupRequestPolicyImpl : public brpc::BackupRequestPolicy {
+ public:
+ int32_t GetBackupRequestMs(const brpc::Controller*) const override {
+ return 10;
+ }
+
+ // Return true if the backup request should be sent.
+ bool DoBackup(const brpc::Controller*) const override {
+ return backup;
+ }
+
+ void OnRPCEnd(const brpc::Controller*) override {}
+
+ bool backup{true};
+
+ };
+
+ void TestBackupRequestPolicy(bool single_server, bool async,
+ bool short_connection) {
+ ASSERT_EQ(0, StartAccept(_ep));
+ for (int i = 0; i < 2; ++i) {
+ bool backup = i == 0;
+ std::cout << " *** single=" << single_server
+ << " async=" << async
+ << " short=" << short_connection
+ << " backup=" << backup
+ << std::endl;
+
+ brpc::Channel channel;
+ SetUpChannel(&channel, single_server, short_connection, NULL, "",
true);
+
+ const int RETRY_NUM = 1;
+ test::EchoRequest req;
+ test::EchoResponse res;
+ brpc::Controller cntl;
+ req.set_message(__FUNCTION__);
+
+ _backup_request_policy.backup = backup;
+ cntl.set_max_retry(RETRY_NUM);
+ cntl.set_timeout_ms(100); // 100ms
+ req.set_sleep_us(50000); // 50ms
+ CallMethod(&channel, &cntl, &req, &res, async);
+ ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
+ ASSERT_EQ(backup, cntl.has_backup_request());
+ ASSERT_EQ(backup ? RETRY_NUM : 0, cntl.retried_count());
+ bthread_usleep(70000); // wait for the sleep task to finish
+
+ if (short_connection) {
+ // Sleep to let `_messenger' detect `Socket' being `SetFailed'
+ const int64_t start_time = butil::gettimeofday_us();
+ while (_messenger.ConnectionCount() != 0) {
+ ASSERT_LT(butil::gettimeofday_us(), start_time +
100000L/*100ms*/);
+ bthread_usleep(1000);
+ }
+ } else {
+ ASSERT_GE(1ul, _messenger.ConnectionCount());
+ }
+ }
+
+ StopAndJoin();
+ }
+
butil::EndPoint _ep;
butil::TempFile _server_list;
std::string _naming_url;
@@ -1929,6 +2034,7 @@ protected:
bool _close_fd_once;
MyEchoService _svc;
+ BackupRequestPolicyImpl _backup_request_policy;
};
class MyShared : public brpc::SharedObject {
@@ -2596,6 +2702,26 @@ TEST_F(ChannelTest, retry_backoff) {
}
}
+TEST_F(ChannelTest, backup_request) {
+ for (int i = 0; i <= 1; ++i) { // Flag SingleServer
+ for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
+ for (int k = 0; k <= 1; ++k) { // Flag ShortConnection
+ TestBackupRequest(i, j, k);
+ }
+ }
+ }
+}
+
+TEST_F(ChannelTest, backup_request_policy) {
+ for (int i = 0; i <= 1; ++i) { // Flag SingleServer
+ for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
+ for (int k = 0; k <= 1; ++k) { // Flag ShortConnection
+ TestBackupRequestPolicy(i, j, k);
+ }
+ }
+ }
+}
+
TEST_F(ChannelTest, multiple_threads_single_channel) {
srand(time(NULL));
ASSERT_EQ(0, StartAccept(_ep));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]