This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 371a5757c feat(plog): garbage collection is allowed to be
enabled/disabled for plog (#1996)
371a5757c is described below
commit 371a5757c1deeb2b7cf8b8befd780e71499f7b8e
Author: Dan Wang <[email protected]>
AuthorDate: Sat May 11 17:38:00 2024 +0800
feat(plog): garbage collection is allowed to be enabled/disabled for plog
(#1996)
Sometimes we have to disable plog garbage collection. For example, to
duplicate
data to the target cluster, we could firstly disable plog garbage
collection, then do
copy_data. After copy_data is finished, a duplication with DS_LOG status
could be
added to continue to duplicate data in plog to target cluster; at the same
time, plog
garbage collection certainly should be enabled again.
Therefore a new configuration is added to enable/disable plog garbage
collection:
```diff
[replication]
+ plog_gc_enabled = true
```
---
src/replica/replica.cpp | 2 +
src/replica/replica.h | 12 ++++
src/replica/replica_chkpt.cpp | 126 ++++++++++++++++++++++-------------------
src/replica/replica_config.cpp | 19 ++++++-
src/replica/replica_stub.cpp | 37 +++++++++++-
5 files changed, 136 insertions(+), 60 deletions(-)
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index c9f2e2811..5bb1f17b8 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -321,6 +321,8 @@ replica::replica(replica_stub *stub,
METRIC_VAR_INIT_replica(backup_file_upload_successful_count),
METRIC_VAR_INIT_replica(backup_file_upload_total_bytes)
{
+ init_plog_gc_enabled();
+
CHECK(!_app_info.app_type.empty(), "");
CHECK_NOTNULL(stub, "");
_stub = stub;
diff --git a/src/replica/replica.h b/src/replica/replica.h
index f367ab35a..3b90641cd 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -437,6 +437,16 @@ private:
std::shared_ptr<learn_response>
resp,
const std::string &chk_dir);
+ // Enable/Disable plog garbage collection to be executed. For example, to
duplicate data
+ // to target cluster, we could firstly disable plog garbage collection,
then do copy_data.
+ // After copy_data is finished, a duplication with DS_LOG status could be
added to continue
+ // to duplicate data in plog to target cluster; at the same time, plog
garbage collection
+ // certainly should be enabled again.
+ void init_plog_gc_enabled();
+ void update_plog_gc_enabled(bool enabled);
+ bool is_plog_gc_enabled() const;
+ std::string get_plog_gc_enabled_message() const;
+
/////////////////////////////////////////////////////////////////
// cold backup
virtual void generate_backup_checkpoint(cold_backup_context_ptr
backup_context);
@@ -574,6 +584,8 @@ private:
// local checkpoint timer for gc, checkpoint, etc.
dsn::task_ptr _checkpoint_timer;
+ std::atomic<bool> _plog_gc_enabled{true};
+
// application
std::unique_ptr<replication_app_base> _app;
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 27346aa17..0145dcab0 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -112,66 +112,78 @@ void replica::on_checkpoint_timer()
init_checkpoint(false);
}
- if (_private_log) {
- mutation_log_ptr plog = _private_log;
-
- decree last_durable_decree = _app->last_durable_decree();
- decree min_confirmed_decree = _duplication_mgr->min_confirmed_decree();
- decree cleanable_decree = last_durable_decree;
- int64_t valid_start_offset =
_app->init_info().init_offset_in_private_log;
-
- if (min_confirmed_decree >= 0) {
- // Do not rely on valid_start_offset for GC during duplication.
- // cleanable_decree is the only GC trigger.
- valid_start_offset = 0;
- if (min_confirmed_decree < last_durable_decree) {
- LOG_INFO_PREFIX("gc_private {}: delay gc for duplication:
min_confirmed_decree({}) "
- "last_durable_decree({})",
- enum_to_string(status()),
- min_confirmed_decree,
- last_durable_decree);
- cleanable_decree = min_confirmed_decree;
- } else {
- LOG_INFO_PREFIX("gc_private {}: min_confirmed_decree({})
last_durable_decree({})",
- enum_to_string(status()),
- min_confirmed_decree,
- last_durable_decree);
- }
- } else if (is_duplication_master()) {
- // unsure if the logs can be dropped, because min_confirmed_decree
- // is currently unavailable
- LOG_INFO_PREFIX(
- "gc_private {}: skip gc because confirmed duplication progress
is unknown",
- enum_to_string(status()));
- return;
- }
+ if (_private_log == nullptr) {
+ return;
+ }
- if (is_duplication_plog_checking()) {
- LOG_DEBUG_PREFIX("gc_private {}: skip gc because duplication is
checking plog files",
- enum_to_string(status()));
- return;
- }
+ if (!is_plog_gc_enabled()) {
+ LOG_WARNING_PREFIX("gc_private {}: skip gc because plog gc is
disabled",
+ enum_to_string(status()));
+ return;
+ }
- tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
- &_tracker,
- [this, plog, cleanable_decree, valid_start_offset] {
- // run in background thread to avoid file
deletion operation blocking
- // replication thread.
- if (status() == partition_status::PS_ERROR ||
- status() == partition_status::PS_INACTIVE)
- return;
- plog->garbage_collection(
- get_gpid(),
- cleanable_decree,
- valid_start_offset,
-
(int64_t)FLAGS_log_private_reserve_max_size_mb * 1024 * 1024,
-
(int64_t)FLAGS_log_private_reserve_max_time_seconds);
- if (status() == partition_status::PS_PRIMARY) {
- METRIC_VAR_SET(private_log_size_mb,
- _private_log->total_size() >>
20);
- }
- });
+ if (is_duplication_plog_checking()) {
+ LOG_INFO_PREFIX("gc_private {}: skip gc because duplication is
checking plog files",
+ enum_to_string(status()));
+ return;
}
+
+ mutation_log_ptr plog = _private_log;
+
+ decree last_durable_decree = _app->last_durable_decree();
+ decree min_confirmed_decree = _duplication_mgr->min_confirmed_decree();
+ decree cleanable_decree = last_durable_decree;
+ int64_t valid_start_offset = _app->init_info().init_offset_in_private_log;
+
+ if (min_confirmed_decree < 0 && is_duplication_master()) {
+ // Not sure whether the plog files could be dropped, because
min_confirmed_decree
+ // is currently unavailable.
+ LOG_INFO_PREFIX("gc_private {}: skip gc because confirmed duplication
progress "
+ "is unknown",
+ enum_to_string(status()));
+ return;
+ }
+
+ if (min_confirmed_decree >= 0) {
+ // Do not rely on valid_start_offset for GC during duplication.
+ // cleanable_decree is the only GC trigger.
+ valid_start_offset = 0;
+ if (min_confirmed_decree < last_durable_decree) {
+ LOG_INFO_PREFIX("gc_private {}: delay gc for duplication:
min_confirmed_decree({}) "
+ "last_durable_decree({})",
+ enum_to_string(status()),
+ min_confirmed_decree,
+ last_durable_decree);
+ cleanable_decree = min_confirmed_decree;
+ } else {
+ LOG_INFO_PREFIX("gc_private {}: min_confirmed_decree({})
last_durable_decree({})",
+ enum_to_string(status()),
+ min_confirmed_decree,
+ last_durable_decree);
+ }
+ }
+
+ tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
+ &_tracker,
+ [this, plog, cleanable_decree, valid_start_offset] {
+ // run in background thread to avoid file deletion
operation blocking
+ // replication thread.
+ if (status() == partition_status::PS_ERROR ||
+ status() == partition_status::PS_INACTIVE) {
+
+ return;
+ }
+
+ plog->garbage_collection(
+ get_gpid(),
+ cleanable_decree,
+ valid_start_offset,
+ (int64_t)FLAGS_log_private_reserve_max_size_mb *
1024 * 1024,
+
(int64_t)FLAGS_log_private_reserve_max_time_seconds);
+ if (status() == partition_status::PS_PRIMARY) {
+ METRIC_VAR_SET(private_log_size_mb,
_private_log->total_size() >> 20);
+ }
+ });
}
// ThreadPool: THREAD_POOL_REPLICATION
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index ca60af2c4..d6e97ed99 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -29,6 +29,7 @@
#include <fmt/format.h>
#include <stdint.h>
#include <algorithm>
+#include <atomic>
#include <chrono>
#include <ios>
#include <map>
@@ -39,6 +40,7 @@
#include <utility>
#include <vector>
+#include "absl/strings/string_view.h"
#include "bulk_load/replica_bulk_loader.h"
#include "common/gpid.h"
#include "common/replica_envs.h"
@@ -63,19 +65,21 @@
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/rpc/serialization.h"
-#include "security/access_controller.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
+#include "security/access_controller.h"
#include "split/replica_split_manager.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/fail_point.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
-#include "absl/strings/string_view.h"
#include "utils/strings.h"
#include "utils/thread_access_checker.h"
+DSN_DEFINE_bool(replication, plog_gc_enabled, true, "Whether to enable plog
garbage collection.");
+
/// The configuration management part of replica.
namespace dsn {
@@ -1202,5 +1206,16 @@ void replica::update_app_duplication_status(bool
duplicating)
_app_info.duplicating);
}
+void replica::init_plog_gc_enabled() {
_plog_gc_enabled.store(FLAGS_plog_gc_enabled); }
+
+void replica::update_plog_gc_enabled(bool enabled) {
_plog_gc_enabled.store(enabled); }
+
+bool replica::is_plog_gc_enabled() const { return _plog_gc_enabled.load(); }
+
+std::string replica::get_plog_gc_enabled_message() const
+{
+ return is_plog_gc_enabled() ? "enabled" : "disabled";
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 286364ac5..3c845a2b9 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -2299,6 +2299,41 @@ void replica_stub::register_ctrl_command()
});
}));
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.enable-plog-gc",
+ "Enable plog garbage collection for replicas specified by
comma-separated list "
+ "of 'app_id' or 'app_id.partition_id', or all replicas for empty",
+ "[id1,id2,...]",
+ [this](const std::vector<std::string> &args) {
+ return exec_command_on_replica(args, true, [](const
replica_ptr &rep) {
+ rep->update_plog_gc_enabled(true);
+ return rep->get_plog_gc_enabled_message();
+ });
+ }));
+
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.disable-plog-gc",
+ "Disable plog garbage collection for replicas specified by
comma-separated list "
+ "of 'app_id' or 'app_id.partition_id', or all replicas for empty",
+ "[id1,id2,...]",
+ [this](const std::vector<std::string> &args) {
+ return exec_command_on_replica(args, true, [](const
replica_ptr &rep) {
+ rep->update_plog_gc_enabled(false);
+ return rep->get_plog_gc_enabled_message();
+ });
+ }));
+
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.query-plog-gc-enabled-status",
+ "Query if plog garbage collection is enabled or disabled for
replicas specified by "
+ "comma-separated list of 'app_id' or 'app_id.partition_id', or all
replicas for empty",
+ "[id1,id2,...]",
+ [this](const std::vector<std::string> &args) {
+ return exec_command_on_replica(args, true, [](const
replica_ptr &rep) {
+ return rep->get_plog_gc_enabled_message();
+ });
+ }));
+
#ifdef DSN_ENABLE_GPERF
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
_release_tcmalloc_memory,
@@ -2346,7 +2381,7 @@ replica_stub::exec_command_on_replica(const
std::vector<std::string> &args,
bool allow_empty_args,
std::function<std::string(const
replica_ptr &rep)> func)
{
- if (!allow_empty_args && args.empty()) {
+ if (args.empty() && !allow_empty_args) {
return std::string("invalid arguments");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]