This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 937e33716 fix(duplication): get duplication manager as shared_ptr in
case raw pointer has been reset to null (#1968)
937e33716 is described below
commit 937e3371680122bc6b555e0dc0c5008af9865793
Author: Dan Wang <[email protected]>
AuthorDate: Tue Apr 2 14:13:38 2024 +0800
fix(duplication): get duplication manager as shared_ptr in case raw pointer
has been reset to null (#1968)
It is not safe to get `_duplication_mgr` as raw pointer or
`std::unique_ptr`. For example,
`r->close()`would be called in `replica_stub::close_replica` without
protection of any lock.
Once `_duplication_mgr.reset()` is called in
`replica::close()`(`r->close()`), `std::unique_ptr`
or raw pointer would become `nullptr`, which is not thread-safe among
multiple threads.
Since `_duplication_mgr` might be called and reset in different threads,
it's better to use
`std::shared_ptr` instead. See also
https://github.com/apache/incubator-pegasus/pull/1608.
---
src/replica/duplication/duplication_sync_timer.cpp | 26 +++++++++++++++++-----
src/replica/replica.cpp | 2 +-
src/replica/replica.h | 7 ++++--
3 files changed, 27 insertions(+), 8 deletions(-)
diff --git a/src/replica/duplication/duplication_sync_timer.cpp
b/src/replica/duplication/duplication_sync_timer.cpp
index e1e0a1645..58d4e2ab7 100644
--- a/src/replica/duplication/duplication_sync_timer.cpp
+++ b/src/replica/duplication/duplication_sync_timer.cpp
@@ -74,7 +74,12 @@ void duplication_sync_timer::run()
// collects confirm points from all primaries on this server
for (const replica_ptr &r : _stub->get_all_primaries()) {
- auto confirmed =
r->get_duplication_manager()->get_duplication_confirms_to_update();
+ const auto dup_mgr = r->get_duplication_manager();
+ if (!dup_mgr) {
+ continue;
+ }
+
+ auto confirmed = dup_mgr->get_duplication_confirms_to_update();
if (!confirmed.empty()) {
req->confirm_list[r->get_gpid()] = std::move(confirmed);
}
@@ -111,13 +116,18 @@ void
duplication_sync_timer::on_duplication_sync_reply(error_code err,
void duplication_sync_timer::update_duplication_map(
const std::map<int32_t, std::map<int32_t, duplication_entry>> &dup_map)
{
- for (replica_ptr &r : _stub->get_all_replicas()) {
+ for (const replica_ptr &r : _stub->get_all_replicas()) {
+ auto dup_mgr = r->get_duplication_manager();
+ if (!dup_mgr) {
+ continue;
+ }
+
const auto &it = dup_map.find(r->get_gpid().get_app_id());
if (it == dup_map.end()) {
// no duplication is assigned to this app
- r->get_duplication_manager()->update_duplication_map({});
+ dup_mgr->update_duplication_map({});
} else {
- r->get_duplication_manager()->update_duplication_map(it->second);
+ dup_mgr->update_duplication_map(it->second);
}
}
}
@@ -166,10 +176,16 @@ duplication_sync_timer::get_dup_states(int app_id,
/*out*/ bool *app_found)
if (rid.get_app_id() != app_id) {
continue;
}
+
+ const auto dup_mgr = r->get_duplication_manager();
+ if (!dup_mgr) {
+ continue;
+ }
+
*app_found = true;
replica_dup_state state;
state.id = rid;
- auto states = r->get_duplication_manager()->get_dup_states();
+ const auto &states = dup_mgr->get_dup_states();
decree last_committed_decree = r->last_committed_decree();
for (const auto &s : states) {
state.duplicating = s.duplicating;
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 9c9c41066..c9f2e2811 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -281,7 +281,7 @@ replica::replica(replica_stub *stub,
_cur_download_size(0),
_restore_progress(0),
_restore_status(ERR_OK),
- _duplication_mgr(new replica_duplicator_manager(this)),
+ _duplication_mgr(std::make_shared<replica_duplicator_manager>(this)),
// todo(jiashuo1): app.duplicating need rename
_is_duplication_master(app.duplicating),
_is_duplication_follower(is_duplication_follower),
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 4a99ff9ad..8b6fe6b16 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -239,7 +239,10 @@ public:
//
error_code trigger_manual_emergency_checkpoint(decree old_decree);
void on_query_last_checkpoint(learn_response &response);
- replica_duplicator_manager *get_duplication_manager() const { return
_duplication_mgr.get(); }
+ std::shared_ptr<replica_duplicator_manager> get_duplication_manager() const
+ {
+ return _duplication_mgr;
+ }
bool is_duplication_master() const { return _is_duplication_master; }
bool is_duplication_follower() const { return _is_duplication_follower; }
bool is_duplication_plog_checking() const { return
_is_duplication_plog_checking.load(); }
@@ -621,7 +624,7 @@ private:
throttling_controller _backup_request_qps_throttling_controller;
// duplication
- std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
+ std::shared_ptr<replica_duplicator_manager> _duplication_mgr;
bool _is_manual_emergency_checkpointing{false};
bool _is_duplication_master{false};
bool _is_duplication_follower{false};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]