This is an automated email from the ASF dual-hosted git repository.
jiashunzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new bf88565c support dynamic update method concurrency (#2923)
bf88565c is described below
commit bf88565ca987379a72d45c771b1ca1d7af746d66
Author: Yang,Liming <[email protected]>
AuthorDate: Sun Mar 30 23:12:49 2025 +0800
support dynamic update method concurrency (#2923)
---
src/brpc/adaptive_max_concurrency.cpp | 10 ++++++++++
src/brpc/adaptive_max_concurrency.h | 4 ++++
src/brpc/concurrency_limiter.h | 3 +++
src/brpc/policy/auto_concurrency_limiter.cpp | 4 ++++
src/brpc/policy/auto_concurrency_limiter.h | 2 ++
src/brpc/policy/constant_concurrency_limiter.cpp | 6 ++++++
src/brpc/policy/constant_concurrency_limiter.h | 2 ++
src/brpc/policy/timeout_concurrency_limiter.cpp | 5 +++++
src/brpc/policy/timeout_concurrency_limiter.h | 2 ++
src/brpc/server.cpp | 9 +--------
10 files changed, 39 insertions(+), 8 deletions(-)
diff --git a/src/brpc/adaptive_max_concurrency.cpp
b/src/brpc/adaptive_max_concurrency.cpp
index ae11ceff..3b6443b6 100644
--- a/src/brpc/adaptive_max_concurrency.cpp
+++ b/src/brpc/adaptive_max_concurrency.cpp
@@ -21,6 +21,7 @@
#include "butil/logging.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/adaptive_max_concurrency.h"
+#include "brpc/concurrency_limiter.h"
namespace brpc {
@@ -72,6 +73,9 @@ void AdaptiveMaxConcurrency::operator=(const
butil::StringPiece& value) {
value.CopyToString(&_value);
_max_concurrency = -1;
}
+ if (_cl) {
+ _cl->ResetMaxConcurrency(*this);
+ }
}
void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
@@ -82,12 +86,18 @@ void AdaptiveMaxConcurrency::operator=(int max_concurrency)
{
_value = butil::string_printf("%d", max_concurrency);
_max_concurrency = max_concurrency;
}
+ if (_cl) {
+ _cl->ResetMaxConcurrency(*this);
+ }
}
void AdaptiveMaxConcurrency::operator=(const TimeoutConcurrencyConf& value) {
_value = "timeout";
_max_concurrency = -1;
_timeout_conf = value;
+ if (_cl) {
+ _cl->ResetMaxConcurrency(*this);
+ }
}
const std::string& AdaptiveMaxConcurrency::type() const {
diff --git a/src/brpc/adaptive_max_concurrency.h
b/src/brpc/adaptive_max_concurrency.h
index 6bdad1ef..5bea0ec4 100644
--- a/src/brpc/adaptive_max_concurrency.h
+++ b/src/brpc/adaptive_max_concurrency.h
@@ -32,6 +32,7 @@ struct TimeoutConcurrencyConf {
int max_concurrency;
};
+class ConcurrencyLimiter;
class AdaptiveMaxConcurrency{
public:
explicit AdaptiveMaxConcurrency();
@@ -68,11 +69,14 @@ public:
static const std::string& UNLIMITED();
static const std::string& CONSTANT();
+ void SetConcurrencyLimiter(ConcurrencyLimiter* cl) { _cl = cl; }
+
private:
std::string _value;
int _max_concurrency;
TimeoutConcurrencyConf
_timeout_conf; // TODO std::varient for different type
+ ConcurrencyLimiter* _cl{nullptr};
};
inline std::ostream& operator<<(std::ostream& os, const
AdaptiveMaxConcurrency& amc) {
diff --git a/src/brpc/concurrency_limiter.h b/src/brpc/concurrency_limiter.h
index 083e2cf9..351dd0de 100644
--- a/src/brpc/concurrency_limiter.h
+++ b/src/brpc/concurrency_limiter.h
@@ -47,6 +47,9 @@ public:
// The return value is only for logging.
virtual int MaxConcurrency() = 0;
+ // Reset max_concurrency
+ virtual int ResetMaxConcurrency(const AdaptiveMaxConcurrency& amc) = 0;
+
// Create an instance from the amc
// Caller is responsible for delete the instance after usage.
virtual ConcurrencyLimiter* New(const AdaptiveMaxConcurrency& amc) const =
0;
diff --git a/src/brpc/policy/auto_concurrency_limiter.cpp
b/src/brpc/policy/auto_concurrency_limiter.cpp
index d1d52d6d..dd5a02ec 100644
--- a/src/brpc/policy/auto_concurrency_limiter.cpp
+++ b/src/brpc/policy/auto_concurrency_limiter.cpp
@@ -134,6 +134,10 @@ int AutoConcurrencyLimiter::MaxConcurrency() {
return _max_concurrency;
}
+int AutoConcurrencyLimiter::ResetMaxConcurrency(const AdaptiveMaxConcurrency&)
{
+ return -1;
+}
+
int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
int64_t reset_start_us = sampling_time_us +
(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 +
diff --git a/src/brpc/policy/auto_concurrency_limiter.h
b/src/brpc/policy/auto_concurrency_limiter.h
index 6cf5e10c..d221f73b 100644
--- a/src/brpc/policy/auto_concurrency_limiter.h
+++ b/src/brpc/policy/auto_concurrency_limiter.h
@@ -35,6 +35,8 @@ public:
int MaxConcurrency() override;
+ int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
+
AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
private:
diff --git a/src/brpc/policy/constant_concurrency_limiter.cpp
b/src/brpc/policy/constant_concurrency_limiter.cpp
index be5f071c..7d73e2ec 100644
--- a/src/brpc/policy/constant_concurrency_limiter.cpp
+++ b/src/brpc/policy/constant_concurrency_limiter.cpp
@@ -35,6 +35,12 @@ int ConstantConcurrencyLimiter::MaxConcurrency() {
return _max_concurrency.load(butil::memory_order_relaxed);
}
+int ConstantConcurrencyLimiter::ResetMaxConcurrency(
+ const AdaptiveMaxConcurrency& amc) {
+ _max_concurrency.store(static_cast<int>(amc), butil::memory_order_relaxed);
+ return 0;
+}
+
ConstantConcurrencyLimiter*
ConstantConcurrencyLimiter::New(const AdaptiveMaxConcurrency& amc) const {
CHECK_EQ(amc.type(), AdaptiveMaxConcurrency::CONSTANT());
diff --git a/src/brpc/policy/constant_concurrency_limiter.h
b/src/brpc/policy/constant_concurrency_limiter.h
index f58a6286..9bae9393 100644
--- a/src/brpc/policy/constant_concurrency_limiter.h
+++ b/src/brpc/policy/constant_concurrency_limiter.h
@@ -33,6 +33,8 @@ public:
int MaxConcurrency() override;
+ int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
+
ConstantConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const
override;
private:
diff --git a/src/brpc/policy/timeout_concurrency_limiter.cpp
b/src/brpc/policy/timeout_concurrency_limiter.cpp
index 98c1a200..b2582eb1 100644
--- a/src/brpc/policy/timeout_concurrency_limiter.cpp
+++ b/src/brpc/policy/timeout_concurrency_limiter.cpp
@@ -117,6 +117,11 @@ int TimeoutConcurrencyLimiter::MaxConcurrency() {
return FLAGS_timeout_cl_max_concurrency;
}
+int TimeoutConcurrencyLimiter::ResetMaxConcurrency(
+ const AdaptiveMaxConcurrency &) {
+ return -1;
+}
+
bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
diff --git a/src/brpc/policy/timeout_concurrency_limiter.h
b/src/brpc/policy/timeout_concurrency_limiter.h
index 3f0485ee..f7e4dde6 100644
--- a/src/brpc/policy/timeout_concurrency_limiter.h
+++ b/src/brpc/policy/timeout_concurrency_limiter.h
@@ -34,6 +34,8 @@ class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
int MaxConcurrency() override;
+ int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
+
TimeoutConcurrencyLimiter* New(
const AdaptiveMaxConcurrency&) const override;
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 2da703ef..d27c73ec 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -1095,6 +1095,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
return -1;
}
it->second.status->SetConcurrencyLimiter(cl);
+ it->second.max_concurrency.SetConcurrencyLimiter(cl);
}
}
if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
@@ -2221,10 +2222,6 @@ int Server::ResetMaxConcurrency(int max_concurrency) {
}
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
- if (IsRunning()) {
- LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server
started";
- return g_default_max_concurrency_of_method;
- }
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency";
@@ -2235,10 +2232,6 @@ AdaptiveMaxConcurrency&
Server::MaxConcurrencyOf(MethodProperty* mp) {
}
int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
- if (IsRunning()) {
- LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server
started";
- return g_default_max_concurrency_of_method;
- }
if (mp == NULL || mp->status == NULL) {
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]