acelyc111 commented on code in PR #1594:
URL: 
https://github.com/apache/incubator-pegasus/pull/1594#discussion_r1330056816


##########
src/replica/replica_stub.cpp:
##########
@@ -1767,138 +1777,211 @@ void replica_stub::on_gc_replica(replica_stub_ptr 
this_, gpid id)
     }
 }
 
-void replica_stub::on_gc()
+void replica_stub::gc_slog(const replica_gc_info_map &replica_gc_map)
 {
-    uint64_t start = dsn_now_ns();
+    if (_log == nullptr) {
+        return;
+    }
 
-    struct gc_info
-    {
-        replica_ptr rep;
-        partition_status::type status;
-        mutation_log_ptr plog;
-        decree last_durable_decree;
-        int64_t init_offset_in_shared_log;
-    };
-
-    std::unordered_map<gpid, gc_info> rs;
-    {
-        zauto_read_lock l(_replicas_lock);
-        // collect info in lock to prevent the case that the replica is closed 
in replica::close()
-        for (auto &kv : _replicas) {
-            const replica_ptr &rep = kv.second;
-            gc_info &info = rs[kv.first];
-            info.rep = rep;
-            info.status = rep->status();
-            info.plog = rep->private_log();
-            info.last_durable_decree = rep->last_durable_decree();
-            info.init_offset_in_shared_log = 
rep->get_app()->init_info().init_offset_in_shared_log;
+    replica_log_info_map replica_durable_decrees;
+    for (auto &replica_gc : replica_gc_map) {
+        replica_log_info replica_log;
+        auto &rep = replica_gc.second.rep;
+        auto &plog = replica_gc.second.plog;
+        if (plog) {
+            // Flush private log to update `plog_max_commit_on_disk`, and just 
flush once
+            // to avoid flushing infinitely.
+            plog->flush_once();
+            auto plog_max_commit_on_disk = plog->max_commit_on_disk();
+
+            replica_log.max_decree =
+                std::min(replica_gc.second.last_durable_decree, 
plog_max_commit_on_disk);
+            LOG_INFO("gc_shared: gc condition for {}, status = {}, 
garbage_max_decree = {}, "
+                     "last_durable_decree= {}, plog_max_commit_on_disk = {}",
+                     rep->name(),
+                     enum_to_string(replica_gc.second.status),
+                     replica_log.max_decree,
+                     replica_gc.second.last_durable_decree,
+                     plog_max_commit_on_disk);
+        } else {
+            replica_log.max_decree = replica_gc.second.last_durable_decree;
+            LOG_INFO("gc_shared: gc condition for {}, status = {}, 
garbage_max_decree = {}, "
+                     "last_durable_decree = {}",
+                     rep->name(),
+                     enum_to_string(replica_gc.second.status),
+                     replica_log.max_decree,
+                     replica_gc.second.last_durable_decree);
         }
+        replica_log.valid_start_offset = 
replica_gc.second.init_offset_in_shared_log;
+        replica_durable_decrees[replica_gc.first] = replica_log;
     }
 
-    LOG_INFO("start to garbage collection, replica_count = {}", rs.size());
+    // Garbage collection for shared log files.
+    std::set<gpid> prevent_gc_replicas;
+    _log->garbage_collection(replica_durable_decrees, prevent_gc_replicas);
+
+    // Trigger checkpoint to flush memtables once some replicas were found 
that prevent
+    // slog files from being removed for gc.
+    flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);
+
+    auto total_size = _log->total_size();
+    _counter_shared_log_size->set(total_size / (1024 * 1024));
 
-    // gc shared prepare log
+    // TODO(wangdan): currently we could not yet call _log.reset() as below to 
close slog and
+    // reset it to nullptr even if it was found that slog had become empty 
(which means there
+    // had not been any file for slog).
+    // if (total_size == 0) {
+    //     _log.reset();
+    // }
     //
-    // Now that checkpoint is very important for gc, we must be able to 
trigger checkpoint when
-    // necessary.
-    // that is, we should be able to trigger memtable flush when necessary.
+    // The reason for this point is that on_gc() is scheduled by timer to run 
asynchronously
+    // during the initialization of replica_stub. It might happen before 
slog.on_partition_reset()
+    // (building slog._shared_log_info_map), which means slog would be closed 
mistakenly before
+    // it was initialized completely.
     //
-    // How to trigger memtable flush?
-    //   we add a parameter `is_emergency' in dsn_app_async_checkpoint() 
function, when set true,
-    //   the undering storage system should flush memtable as soon as 
possiable.
+    // All of slog files would removed on v2.5; thus it is safe to remove all 
of slog code (which
+    // means even slog object would not be created) on the next version 
(namely 2.6), and this
+    // problem would also be resolved.
+}
+
+void replica_stub::limit_flush_replicas_for_slog_gc(size_t 
prevent_gc_replica_count)
+{
+    const size_t log_shared_gc_flush_replicas_limit = 
FLAGS_log_shared_gc_flush_replicas_limit;
+    if (log_shared_gc_flush_replicas_limit == 0) {
+        // 0 for log_shared_gc_flush_replicas_limit means no limit.
+        _real_log_shared_gc_flush_replicas_limit = 
std::numeric_limits<size_t>::max();
+        return;
+    }
+
+    if (_last_prevent_gc_replica_count == 0) {
+        // Initialize it for the 1st time.
+        _real_log_shared_gc_flush_replicas_limit = 
log_shared_gc_flush_replicas_limit;
+        return;
+    }
+
+    CHECK_GE(_last_prevent_gc_replica_count, prevent_gc_replica_count);
+    size_t flushed_replicas = _last_prevent_gc_replica_count - 
prevent_gc_replica_count;
+    if (flushed_replicas == 0) {
+        // It's too busy to process more flush tasks.
+        _real_log_shared_gc_flush_replicas_limit =
+            std::min(2UL, log_shared_gc_flush_replicas_limit);
+        return;
+    }
+
+    if (_real_log_shared_gc_flush_replicas_limit == 0 ||
+        _real_log_shared_gc_flush_replicas_limit == 
std::numeric_limits<size_t>::max()) {
+        // Once it was previously set with some special values, it should be 
reset.
+        _real_log_shared_gc_flush_replicas_limit = 
log_shared_gc_flush_replicas_limit;
+        return;
+    }
+
+    if (flushed_replicas < _real_log_shared_gc_flush_replicas_limit) {
+        // Keep it unchanged.
+        return;
+    }
+
+    // Increase it to process more flush tasks.
+    _real_log_shared_gc_flush_replicas_limit =
+        std::min(log_shared_gc_flush_replicas_limit, 
_real_log_shared_gc_flush_replicas_limit << 1);
+}
+
+void replica_stub::flush_replicas_for_slog_gc(const replica_gc_info_map 
&replica_gc_map,
+                                              const std::set<gpid> 
&prevent_gc_replicas)
+{
+    // Trigger checkpoints to flush memtables once some replicas were found 
that prevent slog files
+    // from being removed for gc.
     //
-    // When to trigger memtable flush?
-    //   1. Using `[replication].checkpoint_max_interval_hours' option, we can 
set max interval time
-    //   of two adjacent checkpoints; If the time interval is arrived, then 
emergency checkpoint
-    //   will be triggered.
-    //   2. Using `[replication].log_shared_file_count_limit' option, we can 
set max file count of
-    //   shared log; If the limit is exceeded, then emergency checkpoint will 
be triggered; Instead
-    //   of triggering all replicas to do checkpoint, we will only trigger a 
few of necessary
-    //   replicas which block garbage collection of the oldest log file.
+    // How to trigger memtable flush ?
+    //   A parameter `is_emergency' was added for 
`replica::background_async_checkpoint()` function;
+    //   once it's set true, underlying storage engine would flush memtable as 
soon as possiable.
     //
-    if (_log != nullptr) {
-        replica_log_info_map gc_condition;
-        for (auto &kv : rs) {
-            replica_log_info ri;
-            replica_ptr &rep = kv.second.rep;
-            mutation_log_ptr &plog = kv.second.plog;
-            if (plog) {
-                // flush private log to update plog_max_commit_on_disk,
-                // and just flush once to avoid flushing infinitely
-                plog->flush_once();
-
-                decree plog_max_commit_on_disk = plog->max_commit_on_disk();
-                ri.max_decree = std::min(kv.second.last_durable_decree, 
plog_max_commit_on_disk);
-                LOG_INFO("gc_shared: gc condition for {}, status = {}, 
garbage_max_decree = {}, "
-                         "last_durable_decree= {}, plog_max_commit_on_disk = 
{}",
-                         rep->name(),
-                         enum_to_string(kv.second.status),
-                         ri.max_decree,
-                         kv.second.last_durable_decree,
-                         plog_max_commit_on_disk);
-            } else {
-                ri.max_decree = kv.second.last_durable_decree;
-                LOG_INFO("gc_shared: gc condition for {}, status = {}, 
garbage_max_decree = {}, "
-                         "last_durable_decree = {}",
-                         rep->name(),
-                         enum_to_string(kv.second.status),
-                         ri.max_decree,
-                         kv.second.last_durable_decree);
-            }
-            ri.valid_start_offset = kv.second.init_offset_in_shared_log;
-            gc_condition[kv.first] = ri;
+    // When memtable flush is triggered ?
+    //   1. After a fixed interval (specified by 
`[replication].gc_interval_ms` option), try to find
+    //   if there are some replicas preventing slog files from being removed 
for gc; if any, all of
+    //   them would be deleted "gradually" ("gradually" means the number of 
the replicas whose
+    //   memtables are submitted to storage engine to be flushed would be 
limited).
+    //   2. `[replication].checkpoint_max_interval_hours' option specified the 
max interval between
+    //   the two adjacent checkpoints.
+
+    if (prevent_gc_replicas.empty()) {
+        return;
+    }
+
+    limit_flush_replicas_for_slog_gc(prevent_gc_replicas.size());
+    _last_prevent_gc_replica_count = prevent_gc_replicas.size();
+
+    std::ostringstream oss;
+    size_t i = 0;
+    for (const auto &pid : prevent_gc_replicas) {
+        if (i != 0) {
+            oss << ", ";
+        }
+        oss << pid.to_string();
+        ++i;
+    }

Review Comment:
   How about using ` fmt::join()` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to