This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 95948e5fae3 branch-4.1: [fix](be) Fix SIGSEGV in bvar::take_sample
caused by AgentCombiner/TLS Agent lifetime race under high EPS #64040 (#64932)
95948e5fae3 is described below
commit 95948e5fae3e503baff304f2ed84fa7532c94b93
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 30 10:56:55 2026 +0800
branch-4.1: [fix](be) Fix SIGSEGV in bvar::take_sample caused by
AgentCombiner/TLS Agent lifetime race under high EPS #64040 (#64932)
Cherry-picked from #64040
Co-authored-by: Venkata Kaushik <[email protected]>
---
...pc-1.4.0-fix-agent-combiner-thread-safety.patch | 439 +++++++++++++++++++++
1 file changed, 439 insertions(+)
diff --git
a/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch
b/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch
new file mode 100644
index 00000000000..b7d0a9a93df
--- /dev/null
+++ b/thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch
@@ -0,0 +1,439 @@
+diff --git a/src/bvar/detail/combiner.h b/src/bvar/detail/combiner.h
+index 6a6ab80..65ed9a1 100644
+--- a/src/bvar/detail/combiner.h
++++ b/src/bvar/detail/combiner.h
+@@ -22,6 +22,7 @@
+
+ #include <string> // std::string
+ #include <vector> // std::vector
++#include <memory>
+ #include "butil/atomicops.h" // butil::atomic
+ #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
+ #include "butil/type_traits.h" // butil::add_cr_non_integral
+@@ -42,7 +43,6 @@ public:
+ typedef typename Combiner::Agent agent_type;
+
+ GlobalValue(agent_type* a, Combiner* c) : _a(a), _c(c) {}
+- ~GlobalValue() {}
+
+ // Call this method to unlock tls element and lock the combiner.
+ // Unlocking tls element avoids potential deadlock with
+@@ -113,7 +113,7 @@ class ElementContainer<
+ T, typename butil::enable_if<is_atomical<T>::value>::type> {
+ public:
+ // We don't need any memory fencing here, every op is relaxed.
+-
++
+ inline void load(T* out) {
+ *out = _value.load(butil::memory_order_relaxed);
+ }
+@@ -153,24 +153,26 @@ private:
+ };
+
+ template <typename ResultTp, typename ElementTp, typename BinaryOp>
+-class AgentCombiner {
++class AgentCombiner
++ : public std::enable_shared_from_this<AgentCombiner<ResultTp, ElementTp,
BinaryOp>> {
++
+ public:
+ typedef ResultTp result_type;
+ typedef ElementTp element_type;
+ typedef AgentCombiner<ResultTp, ElementTp, BinaryOp> self_type;
++ typedef std::shared_ptr<self_type> self_shared_type;
++ typedef std::weak_ptr<self_type> self_weak_type;
+ friend class GlobalValue<self_type>;
+-
+- struct Agent : public butil::LinkNode<Agent> {
+- Agent() : combiner(NULL) {}
+
++ struct Agent : public butil::LinkNode<Agent> {
+ ~Agent() {
+- if (combiner) {
+- combiner->commit_and_erase(this);
+- combiner = NULL;
++ self_shared_type c = combiner.lock();
++ if (NULL != c) {
++ c->commit_and_erase(this);
+ }
+ }
+-
+- void reset(const ElementTp& val, self_type* c) {
++
++ void reset(const ElementTp& val, const self_shared_type& c) {
+ combiner = c;
+ element.store(val);
+ }
+@@ -181,11 +183,11 @@ friend class GlobalValue<self_type>;
+ // void operator()(GlobalValue<Combiner> & global_value,
+ // ElementTp & local_value) const {
+ // if (test_for_merging(local_value)) {
+- //
++ //
+ // // Unlock tls element and lock combiner. Obviously
+ // // tls element can be changed during lock().
+ // ResultTp* g = global_value.lock();
+- //
++ //
+ // // *g and local_value are not changed provided
+ // // merge_global is called from the thread owning
+ // // the agent.
+@@ -200,16 +202,23 @@ friend class GlobalValue<self_type>;
+ // ...
+ // }
+ // };
+- //
++ //
+ // NOTE: Only available to non-atomic types.
+ template <typename Op>
+- void merge_global(const Op &op) {
+- GlobalValue<self_type> g(this, combiner);
+- element.merge_global(op, g);
++ void merge_global(const Op &op, self_shared_type c = NULL) {
++ if (NULL == c) {
++ c = combiner.lock();
++ }
++ if (NULL != c) {
++ GlobalValue<self_type> g(this, c.get());
++ element.merge_global(op, g);
++ }
+ }
+
+- self_type *combiner;
+ ElementContainer<ElementTp> element;
++ private:
++ friend class AgentCombiner<ResultTp, ElementTp, BinaryOp>;
++ self_weak_type combiner;
+ };
+
+ typedef detail::AgentGroup<Agent> AgentGroup;
+@@ -231,7 +240,7 @@ friend class GlobalValue<self_type>;
+ _id = -1;
+ }
+ }
+-
++
+ // [Threadsafe] May be called from anywhere
+ ResultTp combine_agents() const {
+ ElementTp tls_value;
+@@ -245,10 +254,10 @@ friend class GlobalValue<self_type>;
+ return ret;
+ }
+
+- typename butil::add_cr_non_integral<ElementTp>::type element_identity()
const
+- { return _element_identity; }
+- typename butil::add_cr_non_integral<ResultTp>::type result_identity()
const
+- { return _result_identity; }
++ typename butil::add_cr_non_integral<ElementTp>::type
++ element_identity() const { return _element_identity; }
++ typename butil::add_cr_non_integral<ResultTp>::type
++ result_identity() const { return _result_identity; }
+
+ // [Threadsafe] May be called from anywhere.
+ ResultTp reset_all_agents() {
+@@ -265,7 +274,7 @@ friend class GlobalValue<self_type>;
+ }
+
+ // Always called from the thread owning the agent.
+- void commit_and_erase(Agent *agent) {
++ void commit_and_erase(Agent* agent) {
+ if (NULL == agent) {
+ return;
+ }
+@@ -279,7 +288,7 @@ friend class GlobalValue<self_type>;
+ }
+
+ // Always called from the thread owning the agent
+- void commit_and_clear(Agent *agent) {
++ void commit_and_clear(Agent* agent) {
+ if (NULL == agent) {
+ return;
+ }
+@@ -290,7 +299,7 @@ friend class GlobalValue<self_type>;
+ }
+
+ // We need this function to be as fast as possible.
+- inline Agent* get_or_create_tls_agent() {
++ Agent* get_or_create_tls_agent() {
+ Agent* agent = AgentGroup::get_tls_agent(_id);
+ if (!agent) {
+ // Create the agent
+@@ -300,10 +309,10 @@ friend class GlobalValue<self_type>;
+ return NULL;
+ }
+ }
+- if (agent->combiner) {
++ if (!agent->combiner.expired()) {
+ return agent;
+ }
+- agent->reset(_element_identity, this);
++ agent->reset(_element_identity, this->shared_from_this());
+ // TODO: Is uniqueness-checking necessary here?
+ {
+ butil::AutoLock guard(_lock);
+@@ -317,8 +326,7 @@ friend class GlobalValue<self_type>;
+ // reseting agents is must because the agent object may be reused.
+ // Set element to be default-constructed so that if it's non-pod,
+ // internal allocations should be released.
+- for (butil::LinkNode<Agent>*
+- node = _agents.head(); node != _agents.end();) {
++ for (butil::LinkNode<Agent>* node = _agents.head(); node !=
_agents.end();) {
+ node->value()->reset(ElementTp(), NULL);
+ butil::LinkNode<Agent>* const saved_next = node->next();
+ node->RemoveFromList();
+diff --git a/src/bvar/detail/percentile.cpp b/src/bvar/detail/percentile.cpp
+index e0412cb..37181cc 100644
+--- a/src/bvar/detail/percentile.cpp
++++ b/src/bvar/detail/percentile.cpp
+@@ -85,9 +85,8 @@ private:
+ int64_t _latency;
+ };
+
+-Percentile::Percentile() : _combiner(NULL), _sampler(NULL) {
+- _combiner = new combiner_type;
+-}
++Percentile::Percentile()
++ : _combiner(std::make_shared<combiner_type>()), _sampler(NULL) {}
+
+ Percentile::~Percentile() {
+ // Have to destroy sampler first to avoid the race between destruction and
+@@ -96,7 +95,6 @@ Percentile::~Percentile() {
+ _sampler->destroy();
+ _sampler = NULL;
+ }
+- delete _combiner;
+ }
+
+ Percentile::value_type Percentile::reset() {
+@@ -126,7 +124,7 @@ Percentile &Percentile::operator<<(int64_t latency) {
+ }
+ return *this;
+ }
+- agent->merge_global(AddLatency(latency));
++ agent->merge_global(AddLatency(latency), _combiner);
+ return *this;
+ }
+
+diff --git a/src/bvar/detail/percentile.h b/src/bvar/detail/percentile.h
+index f04268d..3680178 100644
+--- a/src/bvar/detail/percentile.h
++++ b/src/bvar/detail/percentile.h
+@@ -462,6 +462,7 @@ public:
+ typedef AgentCombiner <GlobalPercentileSamples,
+ ThreadLocalPercentileSamples,
+ AddPercentileSamples> combiner_type;
++ typedef typename combiner_type::self_shared_type
shared_combiner_type;
+ typedef combiner_type::Agent agent_type;
+ Percentile();
+ ~Percentile();
+@@ -484,7 +485,7 @@ public:
+
+ Percentile& operator<<(int64_t latency);
+
+- bool valid() const { return _combiner != NULL && _combiner->valid(); }
++ bool valid() const { return _combiner && _combiner->valid(); }
+
+ // This name is useful for warning negative latencies in operator<<
+ void set_debug_name(const butil::StringPiece& name) {
+@@ -494,8 +495,8 @@ public:
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Percentile);
+
+- combiner_type* _combiner;
+- sampler_type* _sampler;
++ shared_combiner_type _combiner;
++ sampler_type* _sampler;
+ std::string _debug_name;
+ };
+
+diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h
+index 9b73a19..c2c18bd 100644
+--- a/src/bvar/recorder.h
++++ b/src/bvar/recorder.h
+@@ -113,20 +113,21 @@ public:
+ };
+
+ typedef detail::AgentCombiner<Stat, uint64_t, AddToStat> combiner_type;
++ typedef typename combiner_type::self_shared_type shared_combiner_type;
+ typedef combiner_type::Agent agent_type;
+
+- IntRecorder() : _sampler(NULL) {}
++ IntRecorder() : _combiner(std::make_shared<combiner_type>()),
_sampler(NULL) {}
+
+- explicit IntRecorder(const butil::StringPiece& name) : _sampler(NULL) {
++ explicit IntRecorder(const butil::StringPiece& name) : IntRecorder() {
+ expose(name);
+ }
+
+ IntRecorder(const butil::StringPiece& prefix, const butil::StringPiece&
name)
+- : _sampler(NULL) {
++ : IntRecorder() {
+ expose_as(prefix, name);
+ }
+
+- ~IntRecorder() {
++ ~IntRecorder() override {
+ hide();
+ if (_sampler) {
+ _sampler->destroy();
+@@ -138,19 +139,19 @@ public:
+ IntRecorder& operator<<(int64_t/*note*/ sample);
+
+ int64_t average() const {
+- return _combiner.combine_agents().get_average_int();
++ return _combiner->combine_agents().get_average_int();
+ }
+
+ double average(double) const {
+- return _combiner.combine_agents().get_average_double();
++ return _combiner->combine_agents().get_average_double();
+ }
+
+ Stat get_value() const {
+- return _combiner.combine_agents();
++ return _combiner->combine_agents();
+ }
+
+ Stat reset() {
+- return _combiner.reset_all_agents();
++ return _combiner->reset_all_agents();
+ }
+
+ AddStat op() const { return AddStat(); }
+@@ -160,7 +161,7 @@ public:
+ os << get_value();
+ }
+
+- bool valid() const { return _combiner.valid(); }
++ bool valid() const { return _combiner->valid(); }
+
+ sampler_type* get_sampler() {
+ if (NULL == _sampler) {
+@@ -230,7 +231,7 @@ private:
+ }
+
+ private:
+- combiner_type _combiner;
++ shared_combiner_type _combiner;
+ sampler_type* _sampler;
+ std::string _debug_name;
+ };
+@@ -258,7 +259,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t
sample) {
+ << (void*)this << ") " << reason;
+ }
+ }
+- agent_type* agent = _combiner.get_or_create_tls_agent();
++ agent_type* agent = _combiner->get_or_create_tls_agent();
+ if (BAIDU_UNLIKELY(!agent)) {
+ LOG(FATAL) << "Fail to create agent";
+ return *this;
+@@ -276,7 +277,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t
sample) {
+ // Although agent->element might have been cleared at this
+ // point, it is just OK because the very value is 0 in
+ // this case
+- agent->combiner->commit_and_clear(agent);
++ _combiner->commit_and_clear(agent);
+ sum = 0;
+ num = 0;
+ n = 0;
+diff --git a/src/bvar/reducer.h b/src/bvar/reducer.h
+index fbd4fa7..ccf7805 100644
+--- a/src/bvar/reducer.h
++++ b/src/bvar/reducer.h
+@@ -69,13 +69,13 @@ template <typename T, typename Op, typename InvOp =
detail::VoidOp>
+ class Reducer : public Variable {
+ public:
+ typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
++ typedef typename combiner_type::self_shared_type shared_combiner_type;
+ typedef typename combiner_type::Agent agent_type;
+ typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
+ class SeriesSampler : public detail::Sampler {
+ public:
+ SeriesSampler(Reducer* owner, const Op& op)
+ : _owner(owner), _series(op) {}
+- ~SeriesSampler() {}
+ void take_sample() override { _series.append(_owner->get_value()); }
+ void describe(std::ostream& os) { _series.describe(os, NULL); }
+ private:
+@@ -85,16 +85,12 @@ public:
+
+ public:
+ // The `identify' must satisfy: identity Op a == a
+- Reducer(typename butil::add_cr_non_integral<T>::type identity = T(),
+- const Op& op = Op(),
+- const InvOp& inv_op = InvOp())
+- : _combiner(identity, identity, op)
+- , _sampler(NULL)
+- , _series_sampler(NULL)
+- , _inv_op(inv_op) {
+- }
++ explicit Reducer(typename butil::add_cr_non_integral<T>::type identity =
T(),
++ const Op& op = Op(), const InvOp& inv_op = InvOp())
++ : _combiner(std::make_shared<combiner_type>(identity, identity, op))
++ , _sampler(NULL) , _series_sampler(NULL) , _inv_op(inv_op) {}
+
+- ~Reducer() {
++ ~Reducer() override {
+ // Calling hide() manually is a MUST required by Variable.
+ hide();
+ if (_sampler) {
+@@ -119,13 +115,13 @@ public:
+ << "You should not call Reducer<" << butil::class_name_str<T>()
+ << ", " << butil::class_name_str<Op>() << ">::get_value() when a"
+ << " Window<> is used because the operator does not have
inverse.";
+- return _combiner.combine_agents();
++ return _combiner->combine_agents();
+ }
+
+
+ // Reset the reduced value to T().
+ // Returns the reduced value before reset.
+- T reset() { return _combiner.reset_all_agents(); }
++ T reset() { return _combiner->reset_all_agents(); }
+
+ void describe(std::ostream& os, bool quote_string) const override {
+ if (butil::is_same<T, std::string>::value && quote_string) {
+@@ -140,10 +136,10 @@ public:
+ #endif
+
+ // True if this reducer is constructed successfully.
+- bool valid() const { return _combiner.valid(); }
++ bool valid() const { return _combiner->valid(); }
+
+ // Get instance of Op.
+- const Op& op() const { return _combiner.op(); }
++ const Op& op() const { return _combiner->op(); }
+ const InvOp& inv_op() const { return _inv_op; }
+
+ sampler_type* get_sampler() {
+@@ -174,14 +170,14 @@ protected:
+ !butil::is_same<InvOp, detail::VoidOp>::value &&
+ !butil::is_same<T, std::string>::value &&
+ FLAGS_save_series) {
+- _series_sampler = new SeriesSampler(this, _combiner.op());
++ _series_sampler = new SeriesSampler(this, _combiner->op());
+ _series_sampler->schedule();
+ }
+ return rc;
+ }
+
+ private:
+- combiner_type _combiner;
++ shared_combiner_type _combiner;
+ sampler_type* _sampler;
+ SeriesSampler* _series_sampler;
+ InvOp _inv_op;
+@@ -191,12 +187,12 @@ template <typename T, typename Op, typename InvOp>
+ inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<(
+ typename butil::add_cr_non_integral<T>::type value) {
+ // It's wait-free for most time
+- agent_type* agent = _combiner.get_or_create_tls_agent();
++ agent_type* agent = _combiner->get_or_create_tls_agent();
+ if (__builtin_expect(!agent, 0)) {
+ LOG(FATAL) << "Fail to create agent";
+ return *this;
+ }
+- agent->element.modify(_combiner.op(), value);
++ agent->element.modify(_combiner->op(), value);
+ return *this;
+ }
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]