This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 02703638 Fix thread safety of AgentCombiner (#2949)
02703638 is described below
commit 02703638aa9ac68b68350d025afc10e0b20d8371
Author: Bright Chen <[email protected]>
AuthorDate: Thu Apr 17 20:12:57 2025 +0800
Fix thread safety of AgentCombiner (#2949)
---
src/bvar/detail/combiner.h | 58 ++++++++++++++++++++++++------------------
src/bvar/detail/percentile.cpp | 8 +++---
src/bvar/detail/percentile.h | 5 ++--
src/bvar/recorder.h | 21 +++++++--------
src/bvar/reducer.h | 32 ++++++++++-------------
5 files changed, 64 insertions(+), 60 deletions(-)
diff --git a/src/bvar/detail/combiner.h b/src/bvar/detail/combiner.h
index 6a6ab803..07baa891 100644
--- a/src/bvar/detail/combiner.h
+++ b/src/bvar/detail/combiner.h
@@ -22,6 +22,7 @@
#include <string> // std::string
#include <vector> // std::vector
+#include <memory>
#include "butil/atomicops.h" // butil::atomic
#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
#include "butil/type_traits.h" // butil::add_cr_non_integral
@@ -42,7 +43,6 @@ public:
typedef typename Combiner::Agent agent_type;
GlobalValue(agent_type* a, Combiner* c) : _a(a), _c(c) {}
- ~GlobalValue() {}
// Call this method to unlock tls element and lock the combiner.
// Unlocking tls element avoids potential deadlock with
@@ -153,24 +153,26 @@ private:
};
template <typename ResultTp, typename ElementTp, typename BinaryOp>
-class AgentCombiner {
+class AgentCombiner
+ : public std::enable_shared_from_this<AgentCombiner<ResultTp, ElementTp,
BinaryOp>> {
+
public:
typedef ResultTp result_type;
typedef ElementTp element_type;
typedef AgentCombiner<ResultTp, ElementTp, BinaryOp> self_type;
+ typedef std::shared_ptr<self_type> self_shared_type;
+ typedef std::weak_ptr<self_type> self_weak_type;
friend class GlobalValue<self_type>;
-
- struct Agent : public butil::LinkNode<Agent> {
- Agent() : combiner(NULL) {}
+ struct Agent : public butil::LinkNode<Agent> {
~Agent() {
- if (combiner) {
- combiner->commit_and_erase(this);
- combiner = NULL;
+ self_shared_type c = combiner.lock();
+ if (NULL != c) {
+ c->commit_and_erase(this);
}
}
- void reset(const ElementTp& val, self_type* c) {
+ void reset(const ElementTp& val, self_shared_type c) {
combiner = c;
element.store(val);
}
@@ -203,13 +205,20 @@ friend class GlobalValue<self_type>;
//
// NOTE: Only available to non-atomic types.
template <typename Op>
- void merge_global(const Op &op) {
- GlobalValue<self_type> g(this, combiner);
- element.merge_global(op, g);
+ void merge_global(const Op &op, self_shared_type c = NULL) {
+ if (NULL == c) {
+ c = combiner.lock();
+ }
+ if (NULL != c) {
+ GlobalValue<self_type> g(this, c.get());
+ element.merge_global(op, g);
+ }
}
- self_type *combiner;
ElementContainer<ElementTp> element;
+ private:
+ friend class AgentCombiner<ResultTp, ElementTp, BinaryOp>;
+ self_weak_type combiner;
};
typedef detail::AgentGroup<Agent> AgentGroup;
@@ -245,10 +254,10 @@ friend class GlobalValue<self_type>;
return ret;
}
- typename butil::add_cr_non_integral<ElementTp>::type element_identity()
const
- { return _element_identity; }
- typename butil::add_cr_non_integral<ResultTp>::type result_identity()
const
- { return _result_identity; }
+ typename butil::add_cr_non_integral<ElementTp>::type
+ element_identity() const { return _element_identity; }
+ typename butil::add_cr_non_integral<ResultTp>::type
+ result_identity() const { return _result_identity; }
// [Threadsafe] May be called from anywhere.
ResultTp reset_all_agents() {
@@ -265,7 +274,7 @@ friend class GlobalValue<self_type>;
}
// Always called from the thread owning the agent.
- void commit_and_erase(Agent *agent) {
+ void commit_and_erase(Agent* agent) {
if (NULL == agent) {
return;
}
@@ -279,7 +288,7 @@ friend class GlobalValue<self_type>;
}
// Always called from the thread owning the agent
- void commit_and_clear(Agent *agent) {
+ void commit_and_clear(Agent* agent) {
if (NULL == agent) {
return;
}
@@ -290,7 +299,7 @@ friend class GlobalValue<self_type>;
}
// We need this function to be as fast as possible.
- inline Agent* get_or_create_tls_agent() {
+ Agent* get_or_create_tls_agent() {
Agent* agent = AgentGroup::get_tls_agent(_id);
if (!agent) {
// Create the agent
@@ -300,10 +309,10 @@ friend class GlobalValue<self_type>;
return NULL;
}
}
- if (agent->combiner) {
+ if (!agent->combiner.expired()) {
return agent;
}
- agent->reset(_element_identity, this);
+ agent->reset(_element_identity, this->shared_from_this());
// TODO: Is uniqueness-checking necessary here?
{
butil::AutoLock guard(_lock);
@@ -314,11 +323,10 @@ friend class GlobalValue<self_type>;
void clear_all_agents() {
butil::AutoLock guard(_lock);
- // reseting agents is must because the agent object may be reused.
+ // Resting agents is must because the agent object may be reused.
// Set element to be default-constructed so that if it's non-pod,
// internal allocations should be released.
- for (butil::LinkNode<Agent>*
- node = _agents.head(); node != _agents.end();) {
+ for (butil::LinkNode<Agent>* node = _agents.head(); node !=
_agents.end();) {
node->value()->reset(ElementTp(), NULL);
butil::LinkNode<Agent>* const saved_next = node->next();
node->RemoveFromList();
diff --git a/src/bvar/detail/percentile.cpp b/src/bvar/detail/percentile.cpp
index e0412cbe..37181cc3 100644
--- a/src/bvar/detail/percentile.cpp
+++ b/src/bvar/detail/percentile.cpp
@@ -85,9 +85,8 @@ private:
int64_t _latency;
};
-Percentile::Percentile() : _combiner(NULL), _sampler(NULL) {
- _combiner = new combiner_type;
-}
+Percentile::Percentile()
+ : _combiner(std::make_shared<combiner_type>()), _sampler(NULL) {}
Percentile::~Percentile() {
// Have to destroy sampler first to avoid the race between destruction and
@@ -96,7 +95,6 @@ Percentile::~Percentile() {
_sampler->destroy();
_sampler = NULL;
}
- delete _combiner;
}
Percentile::value_type Percentile::reset() {
@@ -126,7 +124,7 @@ Percentile &Percentile::operator<<(int64_t latency) {
}
return *this;
}
- agent->merge_global(AddLatency(latency));
+ agent->merge_global(AddLatency(latency), _combiner);
return *this;
}
diff --git a/src/bvar/detail/percentile.h b/src/bvar/detail/percentile.h
index 5fcc180a..186103b4 100644
--- a/src/bvar/detail/percentile.h
+++ b/src/bvar/detail/percentile.h
@@ -462,6 +462,7 @@ public:
typedef AgentCombiner <GlobalPercentileSamples,
ThreadLocalPercentileSamples,
AddPercentileSamples> combiner_type;
+ typedef typename combiner_type::self_shared_type
shared_combiner_type;
typedef combiner_type::Agent agent_type;
Percentile();
~Percentile();
@@ -494,8 +495,8 @@ public:
private:
DISALLOW_COPY_AND_ASSIGN(Percentile);
- combiner_type* _combiner;
- sampler_type* _sampler;
+ shared_combiner_type _combiner;
+ sampler_type* _sampler;
std::string _debug_name;
};
diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h
index 9b73a19b..c2c18bd1 100644
--- a/src/bvar/recorder.h
+++ b/src/bvar/recorder.h
@@ -113,9 +113,10 @@ public:
};
typedef detail::AgentCombiner<Stat, uint64_t, AddToStat> combiner_type;
+ typedef typename combiner_type::self_shared_type shared_combiner_type;
typedef combiner_type::Agent agent_type;
- IntRecorder() : _sampler(NULL) {}
+ IntRecorder() : _combiner(std::make_shared<combiner_type>()),
_sampler(NULL) {}
explicit IntRecorder(const butil::StringPiece& name) : _sampler(NULL) {
expose(name);
@@ -126,7 +127,7 @@ public:
expose_as(prefix, name);
}
- ~IntRecorder() {
+ ~IntRecorder() override {
hide();
if (_sampler) {
_sampler->destroy();
@@ -138,19 +139,19 @@ public:
IntRecorder& operator<<(int64_t/*note*/ sample);
int64_t average() const {
- return _combiner.combine_agents().get_average_int();
+ return _combiner->combine_agents().get_average_int();
}
double average(double) const {
- return _combiner.combine_agents().get_average_double();
+ return _combiner->combine_agents().get_average_double();
}
Stat get_value() const {
- return _combiner.combine_agents();
+ return _combiner->combine_agents();
}
Stat reset() {
- return _combiner.reset_all_agents();
+ return _combiner->reset_all_agents();
}
AddStat op() const { return AddStat(); }
@@ -160,7 +161,7 @@ public:
os << get_value();
}
- bool valid() const { return _combiner.valid(); }
+ bool valid() const { return _combiner->valid(); }
sampler_type* get_sampler() {
if (NULL == _sampler) {
@@ -230,7 +231,7 @@ private:
}
private:
- combiner_type _combiner;
+ shared_combiner_type _combiner;
sampler_type* _sampler;
std::string _debug_name;
};
@@ -258,7 +259,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample)
{
<< (void*)this << ") " << reason;
}
}
- agent_type* agent = _combiner.get_or_create_tls_agent();
+ agent_type* agent = _combiner->get_or_create_tls_agent();
if (BAIDU_UNLIKELY(!agent)) {
LOG(FATAL) << "Fail to create agent";
return *this;
@@ -276,7 +277,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample)
{
// Although agent->element might have been cleared at this
// point, it is just OK because the very value is 0 in
// this case
- agent->combiner->commit_and_clear(agent);
+ _combiner->commit_and_clear(agent);
sum = 0;
num = 0;
n = 0;
diff --git a/src/bvar/reducer.h b/src/bvar/reducer.h
index fbd4fa78..ccf78054 100644
--- a/src/bvar/reducer.h
+++ b/src/bvar/reducer.h
@@ -69,13 +69,13 @@ template <typename T, typename Op, typename InvOp =
detail::VoidOp>
class Reducer : public Variable {
public:
typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
+ typedef typename combiner_type::self_shared_type shared_combiner_type;
typedef typename combiner_type::Agent agent_type;
typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
class SeriesSampler : public detail::Sampler {
public:
SeriesSampler(Reducer* owner, const Op& op)
: _owner(owner), _series(op) {}
- ~SeriesSampler() {}
void take_sample() override { _series.append(_owner->get_value()); }
void describe(std::ostream& os) { _series.describe(os, NULL); }
private:
@@ -85,16 +85,12 @@ public:
public:
// The `identify' must satisfy: identity Op a == a
- Reducer(typename butil::add_cr_non_integral<T>::type identity = T(),
- const Op& op = Op(),
- const InvOp& inv_op = InvOp())
- : _combiner(identity, identity, op)
- , _sampler(NULL)
- , _series_sampler(NULL)
- , _inv_op(inv_op) {
- }
+ explicit Reducer(typename butil::add_cr_non_integral<T>::type identity =
T(),
+ const Op& op = Op(), const InvOp& inv_op = InvOp())
+ : _combiner(std::make_shared<combiner_type>(identity, identity, op))
+ , _sampler(NULL) , _series_sampler(NULL) , _inv_op(inv_op) {}
- ~Reducer() {
+ ~Reducer() override {
// Calling hide() manually is a MUST required by Variable.
hide();
if (_sampler) {
@@ -119,13 +115,13 @@ public:
<< "You should not call Reducer<" << butil::class_name_str<T>()
<< ", " << butil::class_name_str<Op>() << ">::get_value() when a"
<< " Window<> is used because the operator does not have inverse.";
- return _combiner.combine_agents();
+ return _combiner->combine_agents();
}
// Reset the reduced value to T().
// Returns the reduced value before reset.
- T reset() { return _combiner.reset_all_agents(); }
+ T reset() { return _combiner->reset_all_agents(); }
void describe(std::ostream& os, bool quote_string) const override {
if (butil::is_same<T, std::string>::value && quote_string) {
@@ -140,10 +136,10 @@ public:
#endif
// True if this reducer is constructed successfully.
- bool valid() const { return _combiner.valid(); }
+ bool valid() const { return _combiner->valid(); }
// Get instance of Op.
- const Op& op() const { return _combiner.op(); }
+ const Op& op() const { return _combiner->op(); }
const InvOp& inv_op() const { return _inv_op; }
sampler_type* get_sampler() {
@@ -174,14 +170,14 @@ protected:
!butil::is_same<InvOp, detail::VoidOp>::value &&
!butil::is_same<T, std::string>::value &&
FLAGS_save_series) {
- _series_sampler = new SeriesSampler(this, _combiner.op());
+ _series_sampler = new SeriesSampler(this, _combiner->op());
_series_sampler->schedule();
}
return rc;
}
private:
- combiner_type _combiner;
+ shared_combiner_type _combiner;
sampler_type* _sampler;
SeriesSampler* _series_sampler;
InvOp _inv_op;
@@ -191,12 +187,12 @@ template <typename T, typename Op, typename InvOp>
inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<(
typename butil::add_cr_non_integral<T>::type value) {
// It's wait-free for most time
- agent_type* agent = _combiner.get_or_create_tls_agent();
+ agent_type* agent = _combiner->get_or_create_tls_agent();
if (__builtin_expect(!agent, 0)) {
LOG(FATAL) << "Fail to create agent";
return *this;
}
- agent->element.modify(_combiner.op(), value);
+ agent->element.modify(_combiner->op(), value);
return *this;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]