This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new db756dcdf fix: avoid reopening GC-deleted plog during duplication 
(#2373)
db756dcdf is described below

commit db756dcdf24e2af6169659a67548501ed61f106c
Author: Pengfan Lu <[email protected]>
AuthorDate: Fri Mar 13 20:33:17 2026 +0800

    fix: avoid reopening GC-deleted plog during duplication (#2373)
    
    Fix https://github.com/apache/incubator-pegasus/issues/2372.
    
    Duplication replay may access private log files deleted by
    concurrent GC. Track cleanable decree and stop scanning
    plog files beyond GC boundary to prevent crash.
---
 src/replica/duplication/load_from_private_log.cpp | 26 +++++++++++++++++++----
 src/replica/duplication/load_from_private_log.h   |  3 +--
 src/replica/mutation_log.cpp                      | 13 ++++++++++++
 src/replica/mutation_log.h                        | 17 ++++++++++++---
 src/replica/replica_chkpt.cpp                     |  1 +
 5 files changed, 51 insertions(+), 9 deletions(-)

diff --git a/src/replica/duplication/load_from_private_log.cpp 
b/src/replica/duplication/load_from_private_log.cpp
index 842ed822b..829929ba4 100644
--- a/src/replica/duplication/load_from_private_log.cpp
+++ b/src/replica/duplication/load_from_private_log.cpp
@@ -156,17 +156,35 @@ void load_from_private_log::find_log_file_to_start()
     // Reopen the files. Because the internal file handle of `file_map`
     // is cleared once WAL replay finished. They are unable to read.
     mutation_log::log_file_map_by_index new_file_map;
-    for (const auto &pr : file_map) {
+
+    const decree cleanable_decree = _private_log->get_cleanable_decree();
+    const decree max_decree_gpid = _private_log->max_decree(get_gpid());
+
+    if (max_decree_gpid <= cleanable_decree) {
+        LOG_ERROR_PREFIX("max_decree_gpid({}) should be > cleanable_decree({}) 
for plog",
+                         max_decree_gpid,
+                         cleanable_decree);
+        return;
+    }
+
+    for (auto it = file_map.rbegin(); it != file_map.rend(); ++it) {
         log_file_ptr file;
-        error_s es = log_utils::open_read(pr.second->path(), file);
+        error_s es = log_utils::open_read(it->second->path(), file);
         if (!es.is_ok()) {
             LOG_ERROR_PREFIX("{}", es);
             return;
         }
-        new_file_map.emplace(pr.first, file);
+
+        new_file_map.emplace(it->first, file);
+
+        // If the max decree of a log file falls within `cleanable_decree`, 
the file may be deleted
+        // during the GC of plog files. Therefore, these files should be 
skipped here.
+        if (cleanable_decree >= file->previous_log_max_decree(get_gpid())) {
+            break;
+        }
     }
 
-    find_log_file_to_start(std::move(new_file_map));
+    find_log_file_to_start(new_file_map);
 }
 
 void load_from_private_log::find_log_file_to_start(
diff --git a/src/replica/duplication/load_from_private_log.h 
b/src/replica/duplication/load_from_private_log.h
index bf288ec54..e19c0c80f 100644
--- a/src/replica/duplication/load_from_private_log.h
+++ b/src/replica/duplication/load_from_private_log.h
@@ -82,9 +82,8 @@ public:
     static constexpr int MAX_ALLOWED_FILE_REPEATS{10};
 
 private:
-    void find_log_file_to_start(const mutation_log::log_file_map_by_index 
&log_files);
+    void find_log_file_to_start(const mutation_log::log_file_map_by_index 
&log_file_map);
 
-private:
     friend class load_from_private_log_test;
     friend class load_fail_mode_test;
     FRIEND_TEST(load_fail_mode_test, fail_skip);
diff --git a/src/replica/mutation_log.cpp b/src/replica/mutation_log.cpp
index df3ca2e6b..23b78efe3 100644
--- a/src/replica/mutation_log.cpp
+++ b/src/replica/mutation_log.cpp
@@ -361,6 +361,7 @@ void mutation_log::init_states()
     _private_log_info = {0, 0};
     _plog_max_decree_on_disk = 0;
     _plog_max_commit_on_disk = 0;
+    _cleanable_decree = 0;
 }
 
 error_code mutation_log::open(replay_callback read_callback,
@@ -898,6 +899,18 @@ void 
mutation_log::update_max_commit_on_disk_no_lock(decree d)
     }
 }
 
+decree mutation_log::get_cleanable_decree() const
+{
+    zauto_lock l(_lock);
+    return _cleanable_decree;
+}
+
+void mutation_log::set_cleanable_decree(decree d)
+{
+    zauto_lock l(_lock);
+    _cleanable_decree = d;
+}
+
 bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ 
learn_state &state) const
 {
     CHECK(_is_private, "this method is only valid for private logs");
diff --git a/src/replica/mutation_log.h b/src/replica/mutation_log.h
index 7689004aa..6f018f001 100644
--- a/src/replica/mutation_log.h
+++ b/src/replica/mutation_log.h
@@ -47,6 +47,7 @@
 #include "task/task_code.h"
 #include "task/task_tracker.h"
 #include "utils/autoref_ptr.h"
+#include "utils/ports.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
 #include "utils/zlocks.h"
@@ -301,6 +302,9 @@ public:
 
     task_tracker *tracker() { return &_tracker; }
 
+    decree get_cleanable_decree() const;
+    void set_cleanable_decree(decree d);
+
 protected:
     // 'size' is data size to write; the '_global_end_offset' will be updated 
by 'size'.
     // can switch file only when create_new_log_if_needed = true;
@@ -400,9 +404,14 @@ private:
     // for plog. Since it is set with 
mutation.data.header.last_committed_decree, it must
     // be less than _plog_max_decree_on_disk.
     decree _plog_max_commit_on_disk;
+
+    // The decree threshold for private log garbage collection. Mutations with 
decree <= this
+    // value are considered cleanable and their log files may be deleted by GC.
+    // Protected by _lock in get/set.
+    decree _cleanable_decree;
 };
 
-typedef dsn::ref_ptr<mutation_log> mutation_log_ptr;
+using mutation_log_ptr = dsn::ref_ptr<mutation_log>;
 
 class mutation_log_private : public mutation_log, private replica_base
 {
@@ -438,6 +447,9 @@ public:
     void flush_once() override;
 
 private:
+    DISALLOW_COPY_AND_ASSIGN(mutation_log_private);
+    DISALLOW_MOVE_AND_ASSIGN(mutation_log_private);
+
     // async write pending mutations into log file
     // Preconditions:
     // - _pending_write != nullptr
@@ -457,9 +469,8 @@ private:
     // if count <= 0, means flush until all data is on disk
     void flush_internal(int max_count);
 
-private:
     // bufferring - only one concurrent write is allowed
-    typedef std::vector<mutation_ptr> mutations;
+    using mutations = std::vector<mutation_ptr>;
     std::atomic_bool _is_writing;
     // Writes that are emitted to `commit_log_block` but are not completely 
written.
     // The weak_ptr used here is a trick. Once the pointer freed, ie.
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 42275b954..36d81f5de 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -177,6 +177,7 @@ void replica::on_checkpoint_timer()
         }
     }
 
+    _private_log->set_cleanable_decree(cleanable_decree);
     tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
                      &_tracker,
                      [this, plog, cleanable_decree, valid_start_offset] {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to