acelyc111 commented on code in PR #1594:
URL:
https://github.com/apache/incubator-pegasus/pull/1594#discussion_r1330056816
##########
src/replica/replica_stub.cpp:
##########
@@ -1767,138 +1777,211 @@ void replica_stub::on_gc_replica(replica_stub_ptr
this_, gpid id)
}
}
-void replica_stub::on_gc()
+void replica_stub::gc_slog(const replica_gc_info_map &replica_gc_map)
{
- uint64_t start = dsn_now_ns();
+ if (_log == nullptr) {
+ return;
+ }
- struct gc_info
- {
- replica_ptr rep;
- partition_status::type status;
- mutation_log_ptr plog;
- decree last_durable_decree;
- int64_t init_offset_in_shared_log;
- };
-
- std::unordered_map<gpid, gc_info> rs;
- {
- zauto_read_lock l(_replicas_lock);
- // collect info in lock to prevent the case that the replica is closed
in replica::close()
- for (auto &kv : _replicas) {
- const replica_ptr &rep = kv.second;
- gc_info &info = rs[kv.first];
- info.rep = rep;
- info.status = rep->status();
- info.plog = rep->private_log();
- info.last_durable_decree = rep->last_durable_decree();
- info.init_offset_in_shared_log =
rep->get_app()->init_info().init_offset_in_shared_log;
+ replica_log_info_map replica_durable_decrees;
+ for (auto &replica_gc : replica_gc_map) {
+ replica_log_info replica_log;
+ auto &rep = replica_gc.second.rep;
+ auto &plog = replica_gc.second.plog;
+ if (plog) {
+ // Flush private log to update `plog_max_commit_on_disk`, and just
flush once
+ // to avoid flushing infinitely.
+ plog->flush_once();
+ auto plog_max_commit_on_disk = plog->max_commit_on_disk();
+
+ replica_log.max_decree =
+ std::min(replica_gc.second.last_durable_decree,
plog_max_commit_on_disk);
+ LOG_INFO("gc_shared: gc condition for {}, status = {},
garbage_max_decree = {}, "
+ "last_durable_decree= {}, plog_max_commit_on_disk = {}",
+ rep->name(),
+ enum_to_string(replica_gc.second.status),
+ replica_log.max_decree,
+ replica_gc.second.last_durable_decree,
+ plog_max_commit_on_disk);
+ } else {
+ replica_log.max_decree = replica_gc.second.last_durable_decree;
+ LOG_INFO("gc_shared: gc condition for {}, status = {},
garbage_max_decree = {}, "
+ "last_durable_decree = {}",
+ rep->name(),
+ enum_to_string(replica_gc.second.status),
+ replica_log.max_decree,
+ replica_gc.second.last_durable_decree);
}
+ replica_log.valid_start_offset =
replica_gc.second.init_offset_in_shared_log;
+ replica_durable_decrees[replica_gc.first] = replica_log;
}
- LOG_INFO("start to garbage collection, replica_count = {}", rs.size());
+ // Garbage collection for shared log files.
+ std::set<gpid> prevent_gc_replicas;
+ _log->garbage_collection(replica_durable_decrees, prevent_gc_replicas);
+
+ // Trigger checkpoint to flush memtables once some replicas were found
that prevent
+ // slog files from being removed for gc.
+ flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);
+
+ auto total_size = _log->total_size();
+ _counter_shared_log_size->set(total_size / (1024 * 1024));
- // gc shared prepare log
+ // TODO(wangdan): currently we could not yet call _log.reset() as below to
close slog and
+ // reset it to nullptr even if it was found that slog had become empty
(which means there
+ // had not been any file for slog).
+ // if (total_size == 0) {
+ // _log.reset();
+ // }
//
- // Now that checkpoint is very important for gc, we must be able to
trigger checkpoint when
- // necessary.
- // that is, we should be able to trigger memtable flush when necessary.
+ // The reason for this point is that on_gc() is scheduled by timer to run
asynchronously
+ // during the initialization of replica_stub. It might happen before
slog.on_partition_reset()
+ // (building slog._shared_log_info_map), which means slog would be closed
mistakenly before
+ // it was initialized completely.
//
- // How to trigger memtable flush?
- // we add a parameter `is_emergency' in dsn_app_async_checkpoint()
function, when set true,
- // the undering storage system should flush memtable as soon as
possiable.
+ // All of slog files would removed on v2.5; thus it is safe to remove all
of slog code (which
+ // means even slog object would not be created) on the next version
(namely 2.6), and this
+ // problem would also be resolved.
+}
+
+void replica_stub::limit_flush_replicas_for_slog_gc(size_t
prevent_gc_replica_count)
+{
+ const size_t log_shared_gc_flush_replicas_limit =
FLAGS_log_shared_gc_flush_replicas_limit;
+ if (log_shared_gc_flush_replicas_limit == 0) {
+ // 0 for log_shared_gc_flush_replicas_limit means no limit.
+ _real_log_shared_gc_flush_replicas_limit =
std::numeric_limits<size_t>::max();
+ return;
+ }
+
+ if (_last_prevent_gc_replica_count == 0) {
+ // Initialize it for the 1st time.
+ _real_log_shared_gc_flush_replicas_limit =
log_shared_gc_flush_replicas_limit;
+ return;
+ }
+
+ CHECK_GE(_last_prevent_gc_replica_count, prevent_gc_replica_count);
+ size_t flushed_replicas = _last_prevent_gc_replica_count -
prevent_gc_replica_count;
+ if (flushed_replicas == 0) {
+ // It's too busy to process more flush tasks.
+ _real_log_shared_gc_flush_replicas_limit =
+ std::min(2UL, log_shared_gc_flush_replicas_limit);
+ return;
+ }
+
+ if (_real_log_shared_gc_flush_replicas_limit == 0 ||
+ _real_log_shared_gc_flush_replicas_limit ==
std::numeric_limits<size_t>::max()) {
+ // Once it was previously set with some special values, it should be
reset.
+ _real_log_shared_gc_flush_replicas_limit =
log_shared_gc_flush_replicas_limit;
+ return;
+ }
+
+ if (flushed_replicas < _real_log_shared_gc_flush_replicas_limit) {
+ // Keep it unchanged.
+ return;
+ }
+
+ // Increase it to process more flush tasks.
+ _real_log_shared_gc_flush_replicas_limit =
+ std::min(log_shared_gc_flush_replicas_limit,
_real_log_shared_gc_flush_replicas_limit << 1);
+}
+
+void replica_stub::flush_replicas_for_slog_gc(const replica_gc_info_map
&replica_gc_map,
+ const std::set<gpid>
&prevent_gc_replicas)
+{
+ // Trigger checkpoints to flush memtables once some replicas were found
that prevent slog files
+ // from being removed for gc.
//
- // When to trigger memtable flush?
- // 1. Using `[replication].checkpoint_max_interval_hours' option, we can
set max interval time
- // of two adjacent checkpoints; If the time interval is arrived, then
emergency checkpoint
- // will be triggered.
- // 2. Using `[replication].log_shared_file_count_limit' option, we can
set max file count of
- // shared log; If the limit is exceeded, then emergency checkpoint will
be triggered; Instead
- // of triggering all replicas to do checkpoint, we will only trigger a
few of necessary
- // replicas which block garbage collection of the oldest log file.
+ // How to trigger memtable flush ?
+ // A parameter `is_emergency' was added for
`replica::background_async_checkpoint()` function;
+ // once it's set true, underlying storage engine would flush memtable as
soon as possiable.
//
- if (_log != nullptr) {
- replica_log_info_map gc_condition;
- for (auto &kv : rs) {
- replica_log_info ri;
- replica_ptr &rep = kv.second.rep;
- mutation_log_ptr &plog = kv.second.plog;
- if (plog) {
- // flush private log to update plog_max_commit_on_disk,
- // and just flush once to avoid flushing infinitely
- plog->flush_once();
-
- decree plog_max_commit_on_disk = plog->max_commit_on_disk();
- ri.max_decree = std::min(kv.second.last_durable_decree,
plog_max_commit_on_disk);
- LOG_INFO("gc_shared: gc condition for {}, status = {},
garbage_max_decree = {}, "
- "last_durable_decree= {}, plog_max_commit_on_disk =
{}",
- rep->name(),
- enum_to_string(kv.second.status),
- ri.max_decree,
- kv.second.last_durable_decree,
- plog_max_commit_on_disk);
- } else {
- ri.max_decree = kv.second.last_durable_decree;
- LOG_INFO("gc_shared: gc condition for {}, status = {},
garbage_max_decree = {}, "
- "last_durable_decree = {}",
- rep->name(),
- enum_to_string(kv.second.status),
- ri.max_decree,
- kv.second.last_durable_decree);
- }
- ri.valid_start_offset = kv.second.init_offset_in_shared_log;
- gc_condition[kv.first] = ri;
+ // When memtable flush is triggered ?
+ // 1. After a fixed interval (specified by
`[replication].gc_interval_ms` option), try to find
+ // if there are some replicas preventing slog files from being removed
for gc; if any, all of
+ // them would be deleted "gradually" ("gradually" means the number of
the replicas whose
+ // memtables are submitted to storage engine to be flushed would be
limited).
+ // 2. `[replication].checkpoint_max_interval_hours' option specified the
max interval between
+ // the two adjacent checkpoints.
+
+ if (prevent_gc_replicas.empty()) {
+ return;
+ }
+
+ limit_flush_replicas_for_slog_gc(prevent_gc_replicas.size());
+ _last_prevent_gc_replica_count = prevent_gc_replicas.size();
+
+ std::ostringstream oss;
+ size_t i = 0;
+ for (const auto &pid : prevent_gc_replicas) {
+ if (i != 0) {
+ oss << ", ";
+ }
+ oss << pid.to_string();
+ ++i;
+ }
Review Comment:
How about using ` fmt::join()` instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]