This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 937e33716 fix(duplication): get duplication manager as shared_ptr in 
case raw pointer has been reset to null (#1968)
937e33716 is described below

commit 937e3371680122bc6b555e0dc0c5008af9865793
Author: Dan Wang <[email protected]>
AuthorDate: Tue Apr 2 14:13:38 2024 +0800

    fix(duplication): get duplication manager as shared_ptr in case raw pointer 
has been reset to null (#1968)
    
    It is not safe to get `_duplication_mgr` as raw pointer or 
`std::unique_ptr`. For example,
    `r->close()`would be called in `replica_stub::close_replica` without 
protection of any lock.
    Once `_duplication_mgr.reset()` is called in 
`replica::close()`(`r->close()`), `std::unique_ptr`
    or raw pointer would become `nullptr`, which is not thread-safe among 
multiple threads.
    Since `_duplication_mgr` might be called and reset in different threads, 
it's better to use
    `std::shared_ptr` instead. See also 
https://github.com/apache/incubator-pegasus/pull/1608.
---
 src/replica/duplication/duplication_sync_timer.cpp | 26 +++++++++++++++++-----
 src/replica/replica.cpp                            |  2 +-
 src/replica/replica.h                              |  7 ++++--
 3 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/src/replica/duplication/duplication_sync_timer.cpp 
b/src/replica/duplication/duplication_sync_timer.cpp
index e1e0a1645..58d4e2ab7 100644
--- a/src/replica/duplication/duplication_sync_timer.cpp
+++ b/src/replica/duplication/duplication_sync_timer.cpp
@@ -74,7 +74,12 @@ void duplication_sync_timer::run()
 
     // collects confirm points from all primaries on this server
     for (const replica_ptr &r : _stub->get_all_primaries()) {
-        auto confirmed = 
r->get_duplication_manager()->get_duplication_confirms_to_update();
+        const auto dup_mgr = r->get_duplication_manager();
+        if (!dup_mgr) {
+            continue;
+        }
+
+        auto confirmed = dup_mgr->get_duplication_confirms_to_update();
         if (!confirmed.empty()) {
             req->confirm_list[r->get_gpid()] = std::move(confirmed);
         }
@@ -111,13 +116,18 @@ void 
duplication_sync_timer::on_duplication_sync_reply(error_code err,
 void duplication_sync_timer::update_duplication_map(
     const std::map<int32_t, std::map<int32_t, duplication_entry>> &dup_map)
 {
-    for (replica_ptr &r : _stub->get_all_replicas()) {
+    for (const replica_ptr &r : _stub->get_all_replicas()) {
+        auto dup_mgr = r->get_duplication_manager();
+        if (!dup_mgr) {
+            continue;
+        }
+
         const auto &it = dup_map.find(r->get_gpid().get_app_id());
         if (it == dup_map.end()) {
             // no duplication is assigned to this app
-            r->get_duplication_manager()->update_duplication_map({});
+            dup_mgr->update_duplication_map({});
         } else {
-            r->get_duplication_manager()->update_duplication_map(it->second);
+            dup_mgr->update_duplication_map(it->second);
         }
     }
 }
@@ -166,10 +176,16 @@ duplication_sync_timer::get_dup_states(int app_id, 
/*out*/ bool *app_found)
         if (rid.get_app_id() != app_id) {
             continue;
         }
+
+        const auto dup_mgr = r->get_duplication_manager();
+        if (!dup_mgr) {
+            continue;
+        }
+
         *app_found = true;
         replica_dup_state state;
         state.id = rid;
-        auto states = r->get_duplication_manager()->get_dup_states();
+        const auto &states = dup_mgr->get_dup_states();
         decree last_committed_decree = r->last_committed_decree();
         for (const auto &s : states) {
             state.duplicating = s.duplicating;
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 9c9c41066..c9f2e2811 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -281,7 +281,7 @@ replica::replica(replica_stub *stub,
       _cur_download_size(0),
       _restore_progress(0),
       _restore_status(ERR_OK),
-      _duplication_mgr(new replica_duplicator_manager(this)),
+      _duplication_mgr(std::make_shared<replica_duplicator_manager>(this)),
       // todo(jiashuo1): app.duplicating need rename
       _is_duplication_master(app.duplicating),
       _is_duplication_follower(is_duplication_follower),
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 4a99ff9ad..8b6fe6b16 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -239,7 +239,10 @@ public:
     //
     error_code trigger_manual_emergency_checkpoint(decree old_decree);
     void on_query_last_checkpoint(learn_response &response);
-    replica_duplicator_manager *get_duplication_manager() const { return 
_duplication_mgr.get(); }
+    std::shared_ptr<replica_duplicator_manager> get_duplication_manager() const
+    {
+        return _duplication_mgr;
+    }
     bool is_duplication_master() const { return _is_duplication_master; }
     bool is_duplication_follower() const { return _is_duplication_follower; }
     bool is_duplication_plog_checking() const { return 
_is_duplication_plog_checking.load(); }
@@ -621,7 +624,7 @@ private:
     throttling_controller _backup_request_qps_throttling_controller;
 
     // duplication
-    std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
+    std::shared_ptr<replica_duplicator_manager> _duplication_mgr;
     bool _is_manual_emergency_checkpointing{false};
     bool _is_duplication_master{false};
     bool _is_duplication_follower{false};


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to