empiredan commented on code in PR #1759:
URL:
https://github.com/apache/incubator-pegasus/pull/1759#discussion_r1426152673
##########
src/replica/replica_stub.cpp:
##########
@@ -1568,175 +1483,6 @@ void replica_stub::on_gc_replica(replica_stub_ptr
this_, gpid id)
}
}
-void replica_stub::gc_slog(const replica_gc_info_map &replica_gc_map)
-{
- if (_log == nullptr) {
- return;
- }
-
- replica_log_info_map replica_durable_decrees;
- for (auto &replica_gc : replica_gc_map) {
- replica_log_info replica_log;
- auto &rep = replica_gc.second.rep;
- auto &plog = replica_gc.second.plog;
- if (plog) {
- // Flush private log to update `plog_max_commit_on_disk`, and just
flush once
- // to avoid flushing infinitely.
- plog->flush_once();
- auto plog_max_commit_on_disk = plog->max_commit_on_disk();
-
- replica_log.max_decree =
- std::min(replica_gc.second.last_durable_decree,
plog_max_commit_on_disk);
- LOG_INFO("gc_shared: gc condition for {}, status = {},
garbage_max_decree = {}, "
- "last_durable_decree= {}, plog_max_commit_on_disk = {}",
- rep->name(),
- enum_to_string(replica_gc.second.status),
- replica_log.max_decree,
- replica_gc.second.last_durable_decree,
- plog_max_commit_on_disk);
- } else {
- replica_log.max_decree = replica_gc.second.last_durable_decree;
- LOG_INFO("gc_shared: gc condition for {}, status = {},
garbage_max_decree = {}, "
- "last_durable_decree = {}",
- rep->name(),
- enum_to_string(replica_gc.second.status),
- replica_log.max_decree,
- replica_gc.second.last_durable_decree);
- }
- replica_log.valid_start_offset =
replica_gc.second.init_offset_in_shared_log;
- replica_durable_decrees[replica_gc.first] = replica_log;
- }
-
- // Garbage collection for shared log files.
- std::set<gpid> prevent_gc_replicas;
- _log->garbage_collection(replica_durable_decrees, prevent_gc_replicas);
-
- // Trigger checkpoint to flush memtables once some replicas were found
that prevent
- // slog files from being removed for gc.
- flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);
-
- // TODO(wangdan): currently we could not yet call _log.reset() as below to
close slog and
- // reset it to nullptr even if it was found that slog had become empty
(which means there
- // had not been any file for slog).
- // if (total_size == 0) {
- // _log.reset();
- // }
- //
- // The reason for this point is that on_gc() is scheduled by timer to run
asynchronously
- // during the initialization of replica_stub. It might happen before
slog.on_partition_reset()
- // (building slog._shared_log_info_map), which means slog would be closed
mistakenly before
- // it was initialized completely.
- //
- // All of slog files would removed on v2.5; thus it is safe to remove all
of slog code (which
- // means even slog object would not be created) on the next version
(namely 2.6), and this
- // problem would also be resolved.
-}
-
-void replica_stub::limit_flush_replicas_for_slog_gc(size_t
prevent_gc_replica_count)
-{
- const size_t log_shared_gc_flush_replicas_limit =
FLAGS_log_shared_gc_flush_replicas_limit;
- if (log_shared_gc_flush_replicas_limit == 0) {
- // 0 for log_shared_gc_flush_replicas_limit means no limit.
- _real_log_shared_gc_flush_replicas_limit =
std::numeric_limits<size_t>::max();
- return;
- }
-
- if (_last_prevent_gc_replica_count == 0) {
- // Initialize it for the 1st time.
- _real_log_shared_gc_flush_replicas_limit =
log_shared_gc_flush_replicas_limit;
- return;
- }
-
- CHECK_GE(_last_prevent_gc_replica_count, prevent_gc_replica_count);
- size_t flushed_replicas = _last_prevent_gc_replica_count -
prevent_gc_replica_count;
- if (flushed_replicas == 0) {
- // It's too busy to process more flush tasks.
- _real_log_shared_gc_flush_replicas_limit =
- std::min(2UL, log_shared_gc_flush_replicas_limit);
- return;
- }
-
- if (_real_log_shared_gc_flush_replicas_limit == 0 ||
- _real_log_shared_gc_flush_replicas_limit ==
std::numeric_limits<size_t>::max()) {
- // Once it was previously set with some special values, it should be
reset.
- _real_log_shared_gc_flush_replicas_limit =
log_shared_gc_flush_replicas_limit;
- return;
- }
-
- if (flushed_replicas < _real_log_shared_gc_flush_replicas_limit) {
- // Keep it unchanged.
- return;
- }
-
- // Increase it to process more flush tasks.
- _real_log_shared_gc_flush_replicas_limit =
- std::min(log_shared_gc_flush_replicas_limit,
_real_log_shared_gc_flush_replicas_limit << 1);
-}
-
-void replica_stub::flush_replicas_for_slog_gc(const replica_gc_info_map
&replica_gc_map,
- const std::set<gpid>
&prevent_gc_replicas)
-{
- // Trigger checkpoints to flush memtables once some replicas were found
that prevent slog files
- // from being removed for gc.
- //
- // How to trigger memtable flush ?
- // A parameter `is_emergency' was added for
`replica::background_async_checkpoint()` function;
- // once it's set true, underlying storage engine would flush memtable as
soon as possiable.
- //
- // When memtable flush is triggered ?
- // 1. After a fixed interval (specified by
`[replication].gc_interval_ms` option), try to find
- // if there are some replicas preventing slog files from being removed
for gc; if any, all of
- // them would be deleted "gradually" ("gradually" means the number of
the replicas whose
- // memtables are submitted to storage engine to be flushed would be
limited).
- // 2. `[replication].checkpoint_max_interval_hours' option specified the
max interval between
- // the two adjacent checkpoints.
-
- if (prevent_gc_replicas.empty()) {
- return;
- }
-
- limit_flush_replicas_for_slog_gc(prevent_gc_replicas.size());
- _last_prevent_gc_replica_count = prevent_gc_replicas.size();
-
- LOG_INFO("gc_shared: trigger emergency checkpoints to flush replicas for
gc shared logs: "
- "log_shared_gc_flush_replicas_limit = {}/{},
prevent_gc_replicas({}) = {}",
- _real_log_shared_gc_flush_replicas_limit,
- FLAGS_log_shared_gc_flush_replicas_limit,
- prevent_gc_replicas.size(),
- fmt::join(prevent_gc_replicas, ", "));
-
- size_t i = 0;
- for (const auto &pid : prevent_gc_replicas) {
- const auto &replica_gc = replica_gc_map.find(pid);
- if (replica_gc == replica_gc_map.end()) {
- continue;
- }
-
- if (++i > _real_log_shared_gc_flush_replicas_limit) {
- break;
- }
-
- bool mock_flush = false;
- FAIL_POINT_INJECT_NOT_RETURN_F(
- "mock_flush_replicas_for_slog_gc", [&mock_flush, this,
i](absl::string_view str) {
- CHECK(buf2bool(str, mock_flush),
- "invalid mock_flush_replicas_for_slog_gc toggle, should
be true or false: {}",
- str);
- _mock_flush_replicas_for_test = i;
- });
- if (dsn_unlikely(mock_flush)) {
- continue;
- }
-
- tasking::enqueue(
- LPC_PER_REPLICA_CHECKPOINT_TIMER,
- replica_gc->second.rep->tracker(),
- std::bind(&replica_stub::trigger_checkpoint, this,
replica_gc->second.rep, true),
- pid.thread_hash(),
- std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms /
2)));
- }
-}
-
void replica_stub::on_gc()
Review Comment:
There is no GC actions in this function, change its name to
`collect_statistics` or something else?
##########
src/replica/replica_stub.cpp:
##########
@@ -1754,13 +1500,10 @@ void replica_stub::on_gc()
replica_gc.status = rep->status();
replica_gc.plog = rep->private_log();
replica_gc.last_durable_decree = rep->last_durable_decree();
- replica_gc.init_offset_in_shared_log =
- rep->get_app()->init_info().init_offset_in_shared_log;
}
}
LOG_INFO("start to garbage collection, replica_count = {}",
replica_gc_map.size());
Review Comment:
Now that garbage collection would be removed, does this logging message
need to be changed to something like "start to collect statistics", or just
removed?
There is also an ending message at the end of the function:
```
LOG_INFO("finish to garbage collection, time_used_ns = {}", dsn_now_ns()
- start);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]