This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new db756dcdf fix: avoid reopening GC-deleted plog during duplication
(#2373)
db756dcdf is described below
commit db756dcdf24e2af6169659a67548501ed61f106c
Author: Pengfan Lu <[email protected]>
AuthorDate: Fri Mar 13 20:33:17 2026 +0800
fix: avoid reopening GC-deleted plog during duplication (#2373)
Fix https://github.com/apache/incubator-pegasus/issues/2372.
Duplication replay may access private log files deleted by
concurrent GC. Track cleanable decree and stop scanning
plog files beyond GC boundary to prevent crash.
---
src/replica/duplication/load_from_private_log.cpp | 26 +++++++++++++++++++----
src/replica/duplication/load_from_private_log.h | 3 +--
src/replica/mutation_log.cpp | 13 ++++++++++++
src/replica/mutation_log.h | 17 ++++++++++++---
src/replica/replica_chkpt.cpp | 1 +
5 files changed, 51 insertions(+), 9 deletions(-)
diff --git a/src/replica/duplication/load_from_private_log.cpp
b/src/replica/duplication/load_from_private_log.cpp
index 842ed822b..829929ba4 100644
--- a/src/replica/duplication/load_from_private_log.cpp
+++ b/src/replica/duplication/load_from_private_log.cpp
@@ -156,17 +156,35 @@ void load_from_private_log::find_log_file_to_start()
// Reopen the files. Because the internal file handle of `file_map`
// is cleared once WAL replay finished. They are unable to read.
mutation_log::log_file_map_by_index new_file_map;
- for (const auto &pr : file_map) {
+
+ const decree cleanable_decree = _private_log->get_cleanable_decree();
+ const decree max_decree_gpid = _private_log->max_decree(get_gpid());
+
+ if (max_decree_gpid <= cleanable_decree) {
+ LOG_ERROR_PREFIX("max_decree_gpid({}) should be > cleanable_decree({})
for plog",
+ max_decree_gpid,
+ cleanable_decree);
+ return;
+ }
+
+ for (auto it = file_map.rbegin(); it != file_map.rend(); ++it) {
log_file_ptr file;
- error_s es = log_utils::open_read(pr.second->path(), file);
+ error_s es = log_utils::open_read(it->second->path(), file);
if (!es.is_ok()) {
LOG_ERROR_PREFIX("{}", es);
return;
}
- new_file_map.emplace(pr.first, file);
+
+ new_file_map.emplace(it->first, file);
+
+ // If the max decree of a log file falls within `cleanable_decree`,
the file may be deleted
+ // during the GC of plog files. Therefore, these files should be
skipped here.
+ if (cleanable_decree >= file->previous_log_max_decree(get_gpid())) {
+ break;
+ }
}
- find_log_file_to_start(std::move(new_file_map));
+ find_log_file_to_start(new_file_map);
}
void load_from_private_log::find_log_file_to_start(
diff --git a/src/replica/duplication/load_from_private_log.h
b/src/replica/duplication/load_from_private_log.h
index bf288ec54..e19c0c80f 100644
--- a/src/replica/duplication/load_from_private_log.h
+++ b/src/replica/duplication/load_from_private_log.h
@@ -82,9 +82,8 @@ public:
static constexpr int MAX_ALLOWED_FILE_REPEATS{10};
private:
- void find_log_file_to_start(const mutation_log::log_file_map_by_index
&log_files);
+ void find_log_file_to_start(const mutation_log::log_file_map_by_index
&log_file_map);
-private:
friend class load_from_private_log_test;
friend class load_fail_mode_test;
FRIEND_TEST(load_fail_mode_test, fail_skip);
diff --git a/src/replica/mutation_log.cpp b/src/replica/mutation_log.cpp
index df3ca2e6b..23b78efe3 100644
--- a/src/replica/mutation_log.cpp
+++ b/src/replica/mutation_log.cpp
@@ -361,6 +361,7 @@ void mutation_log::init_states()
_private_log_info = {0, 0};
_plog_max_decree_on_disk = 0;
_plog_max_commit_on_disk = 0;
+ _cleanable_decree = 0;
}
error_code mutation_log::open(replay_callback read_callback,
@@ -898,6 +899,18 @@ void
mutation_log::update_max_commit_on_disk_no_lock(decree d)
}
}
+decree mutation_log::get_cleanable_decree() const
+{
+ zauto_lock l(_lock);
+ return _cleanable_decree;
+}
+
+void mutation_log::set_cleanable_decree(decree d)
+{
+ zauto_lock l(_lock);
+ _cleanable_decree = d;
+}
+
bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/
learn_state &state) const
{
CHECK(_is_private, "this method is only valid for private logs");
diff --git a/src/replica/mutation_log.h b/src/replica/mutation_log.h
index 7689004aa..6f018f001 100644
--- a/src/replica/mutation_log.h
+++ b/src/replica/mutation_log.h
@@ -47,6 +47,7 @@
#include "task/task_code.h"
#include "task/task_tracker.h"
#include "utils/autoref_ptr.h"
+#include "utils/ports.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/zlocks.h"
@@ -301,6 +302,9 @@ public:
task_tracker *tracker() { return &_tracker; }
+ decree get_cleanable_decree() const;
+ void set_cleanable_decree(decree d);
+
protected:
// 'size' is data size to write; the '_global_end_offset' will be updated
by 'size'.
// can switch file only when create_new_log_if_needed = true;
@@ -400,9 +404,14 @@ private:
// for plog. Since it is set with
mutation.data.header.last_committed_decree, it must
// be less than _plog_max_decree_on_disk.
decree _plog_max_commit_on_disk;
+
+ // The decree threshold for private log garbage collection. Mutations with
decree <= this
+ // value are considered cleanable and their log files may be deleted by GC.
+ // Protected by _lock in get/set.
+ decree _cleanable_decree;
};
-typedef dsn::ref_ptr<mutation_log> mutation_log_ptr;
+using mutation_log_ptr = dsn::ref_ptr<mutation_log>;
class mutation_log_private : public mutation_log, private replica_base
{
@@ -438,6 +447,9 @@ public:
void flush_once() override;
private:
+ DISALLOW_COPY_AND_ASSIGN(mutation_log_private);
+ DISALLOW_MOVE_AND_ASSIGN(mutation_log_private);
+
// async write pending mutations into log file
// Preconditions:
// - _pending_write != nullptr
@@ -457,9 +469,8 @@ private:
// if count <= 0, means flush until all data is on disk
void flush_internal(int max_count);
-private:
// bufferring - only one concurrent write is allowed
- typedef std::vector<mutation_ptr> mutations;
+ using mutations = std::vector<mutation_ptr>;
std::atomic_bool _is_writing;
// Writes that are emitted to `commit_log_block` but are not completely
written.
// The weak_ptr used here is a trick. Once the pointer freed, ie.
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 42275b954..36d81f5de 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -177,6 +177,7 @@ void replica::on_checkpoint_timer()
}
}
+ _private_log->set_cleanable_decree(cleanable_decree);
tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this, plog, cleanable_decree, valid_start_offset] {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]