This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 371a5757c feat(plog): garbage collection is allowed to be 
enabled/disabled for plog (#1996)
371a5757c is described below

commit 371a5757c1deeb2b7cf8b8befd780e71499f7b8e
Author: Dan Wang <[email protected]>
AuthorDate: Sat May 11 17:38:00 2024 +0800

    feat(plog): garbage collection is allowed to be enabled/disabled for plog 
(#1996)
    
    Sometimes we have to disable plog garbage collection. For example, to 
duplicate
    data to the target cluster, we could firstly disable plog garbage 
collection, then do
    copy_data. After copy_data is finished, a duplication with DS_LOG status 
could be
    added to continue to duplicate data in plog to target cluster; at the same 
time, plog
    garbage collection certainly should be enabled again.
    
    Therefore a new configuration is added to enable/disable plog garbage 
collection:
    
    ```diff
    [replication]
    + plog_gc_enabled = true
    ```
---
 src/replica/replica.cpp        |   2 +
 src/replica/replica.h          |  12 ++++
 src/replica/replica_chkpt.cpp  | 126 ++++++++++++++++++++++-------------------
 src/replica/replica_config.cpp |  19 ++++++-
 src/replica/replica_stub.cpp   |  37 +++++++++++-
 5 files changed, 136 insertions(+), 60 deletions(-)

diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index c9f2e2811..5bb1f17b8 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -321,6 +321,8 @@ replica::replica(replica_stub *stub,
       METRIC_VAR_INIT_replica(backup_file_upload_successful_count),
       METRIC_VAR_INIT_replica(backup_file_upload_total_bytes)
 {
+    init_plog_gc_enabled();
+
     CHECK(!_app_info.app_type.empty(), "");
     CHECK_NOTNULL(stub, "");
     _stub = stub;
diff --git a/src/replica/replica.h b/src/replica/replica.h
index f367ab35a..3b90641cd 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -437,6 +437,16 @@ private:
                                            std::shared_ptr<learn_response> 
resp,
                                            const std::string &chk_dir);
 
+    // Enable/Disable plog garbage collection to be executed. For example, to 
duplicate data
+    // to target cluster, we could firstly disable plog garbage collection, 
then do copy_data.
+    // After copy_data is finished, a duplication with DS_LOG status could be 
added to continue
+    // to duplicate data in plog to target cluster; at the same time, plog 
garbage collection
+    // certainly should be enabled again.
+    void init_plog_gc_enabled();
+    void update_plog_gc_enabled(bool enabled);
+    bool is_plog_gc_enabled() const;
+    std::string get_plog_gc_enabled_message() const;
+
     /////////////////////////////////////////////////////////////////
     // cold backup
     virtual void generate_backup_checkpoint(cold_backup_context_ptr 
backup_context);
@@ -574,6 +584,8 @@ private:
     // local checkpoint timer for gc, checkpoint, etc.
     dsn::task_ptr _checkpoint_timer;
 
+    std::atomic<bool> _plog_gc_enabled{true};
+
     // application
     std::unique_ptr<replication_app_base> _app;
 
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 27346aa17..0145dcab0 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -112,66 +112,78 @@ void replica::on_checkpoint_timer()
         init_checkpoint(false);
     }
 
-    if (_private_log) {
-        mutation_log_ptr plog = _private_log;
-
-        decree last_durable_decree = _app->last_durable_decree();
-        decree min_confirmed_decree = _duplication_mgr->min_confirmed_decree();
-        decree cleanable_decree = last_durable_decree;
-        int64_t valid_start_offset = 
_app->init_info().init_offset_in_private_log;
-
-        if (min_confirmed_decree >= 0) {
-            // Do not rely on valid_start_offset for GC during duplication.
-            // cleanable_decree is the only GC trigger.
-            valid_start_offset = 0;
-            if (min_confirmed_decree < last_durable_decree) {
-                LOG_INFO_PREFIX("gc_private {}: delay gc for duplication: 
min_confirmed_decree({}) "
-                                "last_durable_decree({})",
-                                enum_to_string(status()),
-                                min_confirmed_decree,
-                                last_durable_decree);
-                cleanable_decree = min_confirmed_decree;
-            } else {
-                LOG_INFO_PREFIX("gc_private {}: min_confirmed_decree({}) 
last_durable_decree({})",
-                                enum_to_string(status()),
-                                min_confirmed_decree,
-                                last_durable_decree);
-            }
-        } else if (is_duplication_master()) {
-            // unsure if the logs can be dropped, because min_confirmed_decree
-            // is currently unavailable
-            LOG_INFO_PREFIX(
-                "gc_private {}: skip gc because confirmed duplication progress 
is unknown",
-                enum_to_string(status()));
-            return;
-        }
+    if (_private_log == nullptr) {
+        return;
+    }
 
-        if (is_duplication_plog_checking()) {
-            LOG_DEBUG_PREFIX("gc_private {}: skip gc because duplication is 
checking plog files",
-                             enum_to_string(status()));
-            return;
-        }
+    if (!is_plog_gc_enabled()) {
+        LOG_WARNING_PREFIX("gc_private {}: skip gc because plog gc is 
disabled",
+                           enum_to_string(status()));
+        return;
+    }
 
-        tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
-                         &_tracker,
-                         [this, plog, cleanable_decree, valid_start_offset] {
-                             // run in background thread to avoid file 
deletion operation blocking
-                             // replication thread.
-                             if (status() == partition_status::PS_ERROR ||
-                                 status() == partition_status::PS_INACTIVE)
-                                 return;
-                             plog->garbage_collection(
-                                 get_gpid(),
-                                 cleanable_decree,
-                                 valid_start_offset,
-                                 
(int64_t)FLAGS_log_private_reserve_max_size_mb * 1024 * 1024,
-                                 
(int64_t)FLAGS_log_private_reserve_max_time_seconds);
-                             if (status() == partition_status::PS_PRIMARY) {
-                                 METRIC_VAR_SET(private_log_size_mb,
-                                                _private_log->total_size() >> 
20);
-                             }
-                         });
+    if (is_duplication_plog_checking()) {
+        LOG_INFO_PREFIX("gc_private {}: skip gc because duplication is 
checking plog files",
+                        enum_to_string(status()));
+        return;
     }
+
+    mutation_log_ptr plog = _private_log;
+
+    decree last_durable_decree = _app->last_durable_decree();
+    decree min_confirmed_decree = _duplication_mgr->min_confirmed_decree();
+    decree cleanable_decree = last_durable_decree;
+    int64_t valid_start_offset = _app->init_info().init_offset_in_private_log;
+
+    if (min_confirmed_decree < 0 && is_duplication_master()) {
+        // Not sure whether the plog files could be dropped, because 
min_confirmed_decree
+        // is currently unavailable.
+        LOG_INFO_PREFIX("gc_private {}: skip gc because confirmed duplication 
progress "
+                        "is unknown",
+                        enum_to_string(status()));
+        return;
+    }
+
+    if (min_confirmed_decree >= 0) {
+        // Do not rely on valid_start_offset for GC during duplication.
+        // cleanable_decree is the only GC trigger.
+        valid_start_offset = 0;
+        if (min_confirmed_decree < last_durable_decree) {
+            LOG_INFO_PREFIX("gc_private {}: delay gc for duplication: 
min_confirmed_decree({}) "
+                            "last_durable_decree({})",
+                            enum_to_string(status()),
+                            min_confirmed_decree,
+                            last_durable_decree);
+            cleanable_decree = min_confirmed_decree;
+        } else {
+            LOG_INFO_PREFIX("gc_private {}: min_confirmed_decree({}) 
last_durable_decree({})",
+                            enum_to_string(status()),
+                            min_confirmed_decree,
+                            last_durable_decree);
+        }
+    }
+
+    tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
+                     &_tracker,
+                     [this, plog, cleanable_decree, valid_start_offset] {
+                         // run in background thread to avoid file deletion 
operation blocking
+                         // replication thread.
+                         if (status() == partition_status::PS_ERROR ||
+                             status() == partition_status::PS_INACTIVE) {
+
+                             return;
+                         }
+
+                         plog->garbage_collection(
+                             get_gpid(),
+                             cleanable_decree,
+                             valid_start_offset,
+                             (int64_t)FLAGS_log_private_reserve_max_size_mb * 
1024 * 1024,
+                             
(int64_t)FLAGS_log_private_reserve_max_time_seconds);
+                         if (status() == partition_status::PS_PRIMARY) {
+                             METRIC_VAR_SET(private_log_size_mb, 
_private_log->total_size() >> 20);
+                         }
+                     });
 }
 
 // ThreadPool: THREAD_POOL_REPLICATION
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index ca60af2c4..d6e97ed99 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -29,6 +29,7 @@
 #include <fmt/format.h>
 #include <stdint.h>
 #include <algorithm>
+#include <atomic>
 #include <chrono>
 #include <ios>
 #include <map>
@@ -39,6 +40,7 @@
 #include <utility>
 #include <vector>
 
+#include "absl/strings/string_view.h"
 #include "bulk_load/replica_bulk_loader.h"
 #include "common/gpid.h"
 #include "common/replica_envs.h"
@@ -63,19 +65,21 @@
 #include "runtime/rpc/rpc_host_port.h"
 #include "runtime/rpc/rpc_message.h"
 #include "runtime/rpc/serialization.h"
-#include "security/access_controller.h"
 #include "runtime/task/async_calls.h"
 #include "runtime/task/task.h"
+#include "security/access_controller.h"
 #include "split/replica_split_manager.h"
 #include "utils/autoref_ptr.h"
 #include "utils/error_code.h"
 #include "utils/fail_point.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/string_conv.h"
-#include "absl/strings/string_view.h"
 #include "utils/strings.h"
 #include "utils/thread_access_checker.h"
 
+DSN_DEFINE_bool(replication, plog_gc_enabled, true, "Whether to enable plog 
garbage collection.");
+
 /// The configuration management part of replica.
 
 namespace dsn {
@@ -1202,5 +1206,16 @@ void replica::update_app_duplication_status(bool 
duplicating)
                         _app_info.duplicating);
 }
 
+void replica::init_plog_gc_enabled() { 
_plog_gc_enabled.store(FLAGS_plog_gc_enabled); }
+
+void replica::update_plog_gc_enabled(bool enabled) { 
_plog_gc_enabled.store(enabled); }
+
+bool replica::is_plog_gc_enabled() const { return _plog_gc_enabled.load(); }
+
+std::string replica::get_plog_gc_enabled_message() const
+{
+    return is_plog_gc_enabled() ? "enabled" : "disabled";
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 286364ac5..3c845a2b9 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -2299,6 +2299,41 @@ void replica_stub::register_ctrl_command()
                 });
             }));
 
+        
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+            "replica.enable-plog-gc",
+            "Enable plog garbage collection for replicas specified by 
comma-separated list "
+            "of 'app_id' or 'app_id.partition_id', or all replicas for empty",
+            "[id1,id2,...]",
+            [this](const std::vector<std::string> &args) {
+                return exec_command_on_replica(args, true, [](const 
replica_ptr &rep) {
+                    rep->update_plog_gc_enabled(true);
+                    return rep->get_plog_gc_enabled_message();
+                });
+            }));
+
+        
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+            "replica.disable-plog-gc",
+            "Disable plog garbage collection for replicas specified by 
comma-separated list "
+            "of 'app_id' or 'app_id.partition_id', or all replicas for empty",
+            "[id1,id2,...]",
+            [this](const std::vector<std::string> &args) {
+                return exec_command_on_replica(args, true, [](const 
replica_ptr &rep) {
+                    rep->update_plog_gc_enabled(false);
+                    return rep->get_plog_gc_enabled_message();
+                });
+            }));
+
+        
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+            "replica.query-plog-gc-enabled-status",
+            "Query if plog garbage collection is enabled or disabled for 
replicas specified by "
+            "comma-separated list of 'app_id' or 'app_id.partition_id', or all 
replicas for empty",
+            "[id1,id2,...]",
+            [this](const std::vector<std::string> &args) {
+                return exec_command_on_replica(args, true, [](const 
replica_ptr &rep) {
+                    return rep->get_plog_gc_enabled_message();
+                });
+            }));
+
 #ifdef DSN_ENABLE_GPERF
         
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
             _release_tcmalloc_memory,
@@ -2346,7 +2381,7 @@ replica_stub::exec_command_on_replica(const 
std::vector<std::string> &args,
                                       bool allow_empty_args,
                                       std::function<std::string(const 
replica_ptr &rep)> func)
 {
-    if (!allow_empty_args && args.empty()) {
+    if (args.empty() && !allow_empty_args) {
         return std::string("invalid arguments");
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to