This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d5621be1f fix(duplication): create checkpoint for the replica with 0 
or 1 record (#2054)
d5621be1f is described below

commit d5621be1feb472a9901b6c78c19f9d0e53ef8b4b
Author: Dan Wang <[email protected]>
AuthorDate: Thu Jul 11 14:52:58 2024 +0800

    fix(duplication): create checkpoint for the replica with 0 or 1 record 
(#2054)
    
    https://github.com/apache/incubator-pegasus/issues/2069
    
    To create the checkpoint of the replica with 0 or 1 record immediately:
    
    - set the min decree for checkpoint to at least 1, which means the 
checkpoint
    would inevitably be created even if the replica is empty.
    - for the empty replica, an empty write would be committed to increase the
    decree to at least 1 to ensure that the checkpoint would be created.
    - the max decree in rocksdb memtable (the last applied decree) is considered
    as the min decree that should be covered by the checkpoint, which means
    currently all of the data in current rocksdb should be included into the 
created
    checkpoint.
    
    The following configuration is added to control the retry interval for 
triggering
    checkpoint:
    
    ```diff
    [replication]
    + trigger_checkpoint_retry_interval_ms = 100
    ```
---
 src/replica/duplication/replica_duplicator.cpp     | 70 +++++++++++------
 src/replica/duplication/replica_duplicator.h       | 12 ++-
 .../duplication/test/duplication_test_base.h       |  7 +-
 .../duplication/test/replica_duplicator_test.cpp   | 48 +++++++-----
 src/replica/replica.h                              | 24 +++++-
 src/replica/replica_chkpt.cpp                      | 89 ++++++++++++++++++----
 src/replica/test/mock_utils.h                      |  7 ++
 src/replica/test/replica_test.cpp                  | 74 ++++++++++++++----
 src/utils/errors.h                                 |  2 +-
 9 files changed, 252 insertions(+), 81 deletions(-)

diff --git a/src/replica/duplication/replica_duplicator.cpp 
b/src/replica/duplication/replica_duplicator.cpp
index 810209651..31d7e9d94 100644
--- a/src/replica/duplication/replica_duplicator.cpp
+++ b/src/replica/duplication/replica_duplicator.cpp
@@ -35,7 +35,6 @@
 #include "load_from_private_log.h"
 #include "replica/mutation_log.h"
 #include "replica/replica.h"
-#include "runtime/task/async_calls.h"
 #include "utils/autoref_ptr.h"
 #include "utils/error_code.h"
 #include "utils/fmt_logging.h"
@@ -64,10 +63,31 @@ replica_duplicator::replica_duplicator(const 
duplication_entry &ent, replica *r)
 
     auto it = ent.progress.find(get_gpid().get_partition_index());
     if (it->second == invalid_decree) {
-        // keep current max committed_decree as start point.
-        // todo(jiashuo1) _start_point_decree hasn't be ready to persist zk, 
so if master restart,
-        // the value will be reset 0
-        _start_point_decree = _progress.last_decree = 
_replica->private_log()->max_commit_on_disk();
+        // Ensure that the checkpoint decree is at least 1. Otherwise, the 
checkpoint could not be
+        // created in time for empty replica; in consequence, the remote 
cluster would inevitably
+        // fail to pull the checkpoint files.
+        //
+        // The max decree in rocksdb memtable (the last applied decree) is 
considered as the min
+        // decree that should be covered by the checkpoint, which means 
currently all of the data
+        // in current rocksdb should be included into the created checkpoint.
+        //
+        // TODO(jiashuo1): _min_checkpoint_decree hasn't be ready to persist 
zk, so if master
+        // restart, the value will be reset to 0.
+        const auto last_applied_decree = _replica->last_applied_decree();
+        _min_checkpoint_decree = std::max(last_applied_decree, 
static_cast<decree>(1));
+        _progress.last_decree = last_applied_decree;
+        LOG_INFO_PREFIX("initialize checkpoint decree: 
min_checkpoint_decree={}, "
+                        "last_committed_decree={}, last_applied_decree={}, "
+                        "last_flushed_decree={}, last_durable_decree={}, "
+                        "plog_max_decree_on_disk={}, 
plog_max_commit_on_disk={}",
+                        _min_checkpoint_decree,
+                        _replica->last_committed_decree(),
+                        last_applied_decree,
+                        _replica->last_flushed_decree(),
+                        _replica->last_durable_decree(),
+                        _replica->private_log()->max_decree_on_disk(),
+                        _replica->private_log()->max_commit_on_disk());
+
     } else {
         _progress.last_decree = _progress.confirmed_decree = it->second;
     }
@@ -86,17 +106,19 @@ replica_duplicator::replica_duplicator(const 
duplication_entry &ent, replica *r)
 
 void replica_duplicator::prepare_dup()
 {
-    LOG_INFO_PREFIX("start prepare checkpoint to catch up with latest durable 
decree: "
-                    "start_point_decree({}) < last_durable_decree({}) = {}",
-                    _start_point_decree,
+    LOG_INFO_PREFIX("start to trigger checkpoint: min_checkpoint_decree={}, "
+                    "last_committed_decree={}, last_applied_decree={}, "
+                    "last_flushed_decree={}, last_durable_decree={}, "
+                    "plog_max_decree_on_disk={}, plog_max_commit_on_disk={}",
+                    _min_checkpoint_decree,
+                    _replica->last_committed_decree(),
+                    _replica->last_applied_decree(),
+                    _replica->last_flushed_decree(),
                     _replica->last_durable_decree(),
-                    _start_point_decree < _replica->last_durable_decree());
+                    _replica->private_log()->max_decree_on_disk(),
+                    _replica->private_log()->max_commit_on_disk());
 
-    tasking::enqueue(
-        LPC_REPLICATION_COMMON,
-        &_tracker,
-        [this]() { 
_replica->trigger_manual_emergency_checkpoint(_start_point_decree); },
-        get_gpid().thread_hash());
+    
_replica->async_trigger_manual_emergency_checkpoint(_min_checkpoint_decree, 0);
 }
 
 void replica_duplicator::start_dup_log()
@@ -162,19 +184,19 @@ void 
replica_duplicator::update_status_if_needed(duplication_status::type next_s
         return;
     }
 
-    // DS_PREPARE means replica is checkpointing, it may need trigger multi 
time to catch
-    // _start_point_decree of the plog
+    // DS_PREPARE means this replica is making checkpoint, which might need to 
be triggered
+    // multiple times to catch up with _min_checkpoint_decree.
     if (_status == next_status && next_status != 
duplication_status::DS_PREPARE) {
         return;
     }
 
-    LOG_INFO_PREFIX(
-        "update duplication status: {}=>{}[start_point={}, last_commit={}, 
last_durable={}]",
-        duplication_status_to_string(_status),
-        duplication_status_to_string(next_status),
-        _start_point_decree,
-        _replica->last_committed_decree(),
-        _replica->last_durable_decree());
+    LOG_INFO_PREFIX("update duplication status: {}=>{} 
[min_checkpoint_decree={}, "
+                    "last_committed_decree={}, last_durable_decree={}]",
+                    duplication_status_to_string(_status),
+                    duplication_status_to_string(next_status),
+                    _min_checkpoint_decree,
+                    _replica->last_committed_decree(),
+                    _replica->last_durable_decree());
 
     _status = next_status;
     if (_status == duplication_status::DS_PREPARE) {
@@ -220,7 +242,7 @@ error_s replica_duplicator::update_progress(const 
duplication_progress &p)
     decree last_confirmed_decree = _progress.confirmed_decree;
     _progress.confirmed_decree = std::max(_progress.confirmed_decree, 
p.confirmed_decree);
     _progress.last_decree = std::max(_progress.last_decree, p.last_decree);
-    _progress.checkpoint_has_prepared = _start_point_decree <= 
_replica->last_durable_decree();
+    _progress.checkpoint_has_prepared = _min_checkpoint_decree <= 
_replica->last_durable_decree();
 
     if (_progress.confirmed_decree > _progress.last_decree) {
         return FMT_ERR(ERR_INVALID_STATE,
diff --git a/src/replica/duplication/replica_duplicator.h 
b/src/replica/duplication/replica_duplicator.h
index 66f7ac7ce..9a8deed0d 100644
--- a/src/replica/duplication/replica_duplicator.h
+++ b/src/replica/duplication/replica_duplicator.h
@@ -39,12 +39,13 @@ namespace replication {
 class duplication_progress
 {
 public:
-    // check if checkpoint has catch up with `_start_point_decree`
+    // Check if checkpoint has covered `_min_checkpoint_decree`.
     bool checkpoint_has_prepared{false};
-    // the maximum decree that's been persisted in meta server
+
+    // The max decree that has been persisted in the meta server.
     decree confirmed_decree{invalid_decree};
 
-    // the maximum decree that's been duplicated to remote.
+    // The max decree that has been duplicated to the remote cluster.
     decree last_decree{invalid_decree};
 
     duplication_progress &set_last_decree(decree d)
@@ -184,7 +185,10 @@ private:
     replica_stub *_stub;
     dsn::task_tracker _tracker;
 
-    decree _start_point_decree = invalid_decree;
+    // The min decree that should be covered by the checkpoint which is 
triggered by the
+    // newly added duplication.
+    decree _min_checkpoint_decree{invalid_decree};
+
     duplication_status::type _status{duplication_status::DS_INIT};
     std::atomic<duplication_fail_mode::type> 
_fail_mode{duplication_fail_mode::FAIL_SLOW};
 
diff --git a/src/replica/duplication/test/duplication_test_base.h 
b/src/replica/duplication/test/duplication_test_base.h
index 69d935cc1..cd54fe9d8 100644
--- a/src/replica/duplication/test/duplication_test_base.h
+++ b/src/replica/duplication/test/duplication_test_base.h
@@ -54,17 +54,16 @@ public:
         return dup_entities[dupid].get();
     }
 
-    std::unique_ptr<replica_duplicator> create_test_duplicator(decree 
confirmed = invalid_decree,
-                                                               decree start = 
invalid_decree)
+    std::unique_ptr<replica_duplicator>
+    create_test_duplicator(decree confirmed_decree = invalid_decree)
     {
         duplication_entry dup_ent;
         dup_ent.dupid = 1;
         dup_ent.remote = "remote_address";
         dup_ent.status = duplication_status::DS_PAUSE;
-        dup_ent.progress[_replica->get_gpid().get_partition_index()] = 
confirmed;
+        dup_ent.progress[_replica->get_gpid().get_partition_index()] = 
confirmed_decree;
 
         auto duplicator = std::make_unique<replica_duplicator>(dup_ent, 
_replica.get());
-        duplicator->_start_point_decree = start;
         return duplicator;
     }
 
diff --git a/src/replica/duplication/test/replica_duplicator_test.cpp 
b/src/replica/duplication/test/replica_duplicator_test.cpp
index 817e3090f..78e1aabfb 100644
--- a/src/replica/duplication/test/replica_duplicator_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_test.cpp
@@ -64,9 +64,9 @@ public:
 
     decree last_durable_decree() const { return 
_replica->last_durable_decree(); }
 
-    decree log_dup_start_decree(const std::unique_ptr<replica_duplicator> 
&dup) const
+    decree min_checkpoint_decree(const std::unique_ptr<replica_duplicator> 
&dup) const
     {
-        return dup->_start_point_decree;
+        return dup->_min_checkpoint_decree;
     }
 
     void test_new_duplicator(const std::string &remote_app_name, bool 
specify_remote_app_name)
@@ -157,39 +157,51 @@ TEST_P(replica_duplicator_test, pause_start_duplication) 
{ test_pause_start_dupl
 TEST_P(replica_duplicator_test, duplication_progress)
 {
     auto duplicator = create_test_duplicator();
-    ASSERT_EQ(duplicator->progress().last_decree, 0); // start duplication 
from empty plog
-    ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);
 
+    // Start duplication from empty replica.
+    ASSERT_EQ(1, min_checkpoint_decree(duplicator));
+    ASSERT_EQ(0, duplicator->progress().last_decree);
+    ASSERT_EQ(invalid_decree, duplicator->progress().confirmed_decree);
+
+    // Update the max decree that has been duplicated to the remote cluster.
     duplicator->update_progress(duplicator->progress().set_last_decree(10));
-    ASSERT_EQ(duplicator->progress().last_decree, 10);
-    ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);
+    ASSERT_EQ(10, duplicator->progress().last_decree);
+    ASSERT_EQ(invalid_decree, duplicator->progress().confirmed_decree);
 
+    // Update the max decree that has been persisted in the meta server.
     
duplicator->update_progress(duplicator->progress().set_confirmed_decree(10));
-    ASSERT_EQ(duplicator->progress().confirmed_decree, 10);
-    ASSERT_EQ(duplicator->progress().last_decree, 10);
+    ASSERT_EQ(10, duplicator->progress().last_decree);
+    ASSERT_EQ(10, duplicator->progress().confirmed_decree);
 
-    
ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(1)),
-              error_s::make(ERR_INVALID_STATE, "never decrease 
confirmed_decree: new(1) old(10)"));
+    ASSERT_EQ(error_s::make(ERR_INVALID_STATE, "never decrease 
confirmed_decree: new(1) old(10)"),
+              
duplicator->update_progress(duplicator->progress().set_confirmed_decree(1)));
 
-    
ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(12)),
-              error_s::make(ERR_INVALID_STATE,
-                            "last_decree(10) should always larger than 
confirmed_decree(12)"));
+    ASSERT_EQ(error_s::make(ERR_INVALID_STATE,
+                            "last_decree(10) should always larger than 
confirmed_decree(12)"),
+              
duplicator->update_progress(duplicator->progress().set_confirmed_decree(12)));
 
-    auto duplicator_for_checkpoint = create_test_duplicator(invalid_decree, 
100);
+    // Test that the checkpoint has not been created.
+    replica()->update_last_applied_decree(100);
+    auto duplicator_for_checkpoint = create_test_duplicator();
     
ASSERT_FALSE(duplicator_for_checkpoint->progress().checkpoint_has_prepared);
 
-    replica()->update_last_durable_decree(101);
+    // Test that the checkpoint has been created.
+    replica()->update_last_durable_decree(100);
     duplicator_for_checkpoint->update_progress(duplicator->progress());
     ASSERT_TRUE(duplicator_for_checkpoint->progress().checkpoint_has_prepared);
 }
 
-TEST_P(replica_duplicator_test, prapre_dup)
+TEST_P(replica_duplicator_test, prepare_dup)
 {
-    auto duplicator = create_test_duplicator(invalid_decree, 100);
+    replica()->update_last_applied_decree(100);
     replica()->update_expect_last_durable_decree(100);
+
+    auto duplicator = create_test_duplicator();
     duplicator->prepare_dup();
     wait_all(duplicator);
-    ASSERT_EQ(last_durable_decree(), log_dup_start_decree(duplicator));
+
+    ASSERT_EQ(100, min_checkpoint_decree(duplicator));
+    ASSERT_EQ(100, last_durable_decree());
 }
 
 } // namespace replication
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 12e505dcd..802d07ef3 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -30,6 +30,7 @@
 #include <stddef.h>
 #include <stdint.h>
 #include <atomic>
+#include <functional>
 #include <map>
 #include <memory>
 #include <string>
@@ -268,7 +269,24 @@ public:
     //
     // Duplication
     //
-    error_code trigger_manual_emergency_checkpoint(decree old_decree);
+
+    using trigger_checkpoint_callback = std::function<void(error_code)>;
+
+    // Choose a fixed thread from pool to trigger an emergency checkpoint 
asynchronously.
+    // A new checkpoint would still be created even if the replica is empty 
(hasn't received
+    // any write operation).
+    //
+    // Parameters:
+    // - `min_checkpoint_decree`: the min decree that should be covered by the 
triggered
+    // checkpoint. Should be a number greater than 0 which means a new 
checkpoint must be
+    // created.
+    // - `delay_ms`: the delayed time in milliseconds that the triggering task 
is put into
+    // the thread pool.
+    // - `callback`: the callback processor handling the error code of 
triggering checkpoint.
+    void async_trigger_manual_emergency_checkpoint(decree 
min_checkpoint_decree,
+                                                   uint32_t delay_ms,
+                                                   trigger_checkpoint_callback 
callback = {});
+
     void on_query_last_checkpoint(learn_response &response);
     std::shared_ptr<replica_duplicator_manager> get_duplication_manager() const
     {
@@ -471,6 +489,10 @@ private:
     bool is_plog_gc_enabled() const;
     std::string get_plog_gc_enabled_message() const;
 
+    // Trigger an emergency checkpoint for duplication. Once the replica is 
empty (hasn't
+    // received any write operation), there would be no checkpoint created.
+    error_code trigger_manual_emergency_checkpoint(decree 
min_checkpoint_decree);
+
     /////////////////////////////////////////////////////////////////
     // cold backup
     virtual void generate_backup_checkpoint(cold_backup_context_ptr 
backup_context);
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 0145dcab0..ea8ff41c3 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -42,6 +42,7 @@
 #include "metadata_types.h"
 #include "mutation_log.h"
 #include "replica.h"
+#include "replica/mutation.h"
 #include "replica/prepare_list.h"
 #include "replica/replica_context.h"
 #include "replica/replication_app_base.h"
@@ -69,12 +70,14 @@ DSN_DEFINE_int32(replication,
                  checkpoint_max_interval_hours,
                  2,
                  "The maximum time interval in hours of replica checkpoints 
must be generated");
+
 DSN_DEFINE_int32(replication,
                  log_private_reserve_max_size_mb,
                  1000,
                  "The maximum size of useless private log to be reserved. 
NOTE: only when "
                  "'log_private_reserve_max_size_mb' and 
'log_private_reserve_max_time_seconds' are "
                  "both satisfied, the useless logs can be reserved");
+
 DSN_DEFINE_int32(
     replication,
     log_private_reserve_max_time_seconds,
@@ -83,6 +86,11 @@ DSN_DEFINE_int32(
     "when 'log_private_reserve_max_size_mb' and 
'log_private_reserve_max_time_seconds' "
     "are both satisfied, the useless logs can be reserved");
 
+DSN_DEFINE_uint32(replication,
+                  trigger_checkpoint_retry_interval_ms,
+                  100,
+                  "The wait interval before next attempt for empty write.");
+
 namespace dsn {
 namespace replication {
 
@@ -186,8 +194,59 @@ void replica::on_checkpoint_timer()
                      });
 }
 
+void replica::async_trigger_manual_emergency_checkpoint(decree 
min_checkpoint_decree,
+                                                        uint32_t delay_ms,
+                                                        
trigger_checkpoint_callback callback)
+{
+    CHECK_GT_PREFIX_MSG(min_checkpoint_decree,
+                        0,
+                        "min_checkpoint_decree should be a number greater than 
0 "
+                        "which means a new checkpoint must be created");
+
+    tasking::enqueue(
+        LPC_REPLICATION_COMMON,
+        &_tracker,
+        [min_checkpoint_decree, callback, this]() {
+            _checker.only_one_thread_access();
+
+            if (_app == nullptr) {
+                LOG_ERROR_PREFIX("app hasn't been initialized or has been 
released");
+                return;
+            }
+
+            const auto last_applied_decree = this->last_applied_decree();
+            if (last_applied_decree == 0) {
+                LOG_INFO_PREFIX("ready to commit an empty write to trigger 
checkpoint: "
+                                "min_checkpoint_decree={}, 
last_applied_decree={}, "
+                                "last_durable_decree={}",
+                                min_checkpoint_decree,
+                                last_applied_decree,
+                                last_durable_decree());
+
+                // For the empty replica, here we commit an empty write would 
be to increase
+                // the decree to at least 1, to ensure that the checkpoint 
would inevitably
+                // be created even if the replica is empty.
+                mutation_ptr mu = new_mutation(invalid_decree);
+                mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
+                init_prepare(mu, false);
+
+                async_trigger_manual_emergency_checkpoint(
+                    min_checkpoint_decree, 
FLAGS_trigger_checkpoint_retry_interval_ms, callback);
+
+                return;
+            }
+
+            const auto err = 
trigger_manual_emergency_checkpoint(min_checkpoint_decree);
+            if (callback) {
+                callback(err);
+            }
+        },
+        get_gpid().thread_hash(),
+        std::chrono::milliseconds(delay_ms));
+}
+
 // ThreadPool: THREAD_POOL_REPLICATION
-error_code replica::trigger_manual_emergency_checkpoint(decree old_decree)
+error_code replica::trigger_manual_emergency_checkpoint(decree 
min_checkpoint_decree)
 {
     _checker.only_one_thread_access();
 
@@ -196,20 +255,18 @@ error_code 
replica::trigger_manual_emergency_checkpoint(decree old_decree)
         return ERR_LOCAL_APP_FAILURE;
     }
 
-    if (old_decree <= _app->last_durable_decree()) {
-        LOG_INFO_PREFIX("checkpoint has been completed: old = {} vs latest = 
{}",
-                        old_decree,
-                        _app->last_durable_decree());
+    const auto last_durable_decree = this->last_durable_decree();
+    if (min_checkpoint_decree <= last_durable_decree) {
+        LOG_INFO_PREFIX(
+            "checkpoint has been completed: min_checkpoint_decree={}, 
last_durable_decree={}",
+            min_checkpoint_decree,
+            last_durable_decree);
         _is_manual_emergency_checkpointing = false;
-        _stub->_manual_emergency_checkpointing_count == 0
-            ? 0
-            : (--_stub->_manual_emergency_checkpointing_count);
         return ERR_OK;
     }
 
     if (_is_manual_emergency_checkpointing) {
-        LOG_WARNING_PREFIX("replica is checkpointing, last_durable_decree = 
{}",
-                           _app->last_durable_decree());
+        LOG_WARNING_PREFIX("replica is checkpointing, last_durable_decree={}", 
last_durable_decree);
         return ERR_BUSY;
     }
 
@@ -307,9 +364,9 @@ error_code replica::background_async_checkpoint(bool 
is_emergency)
 
         if (_is_manual_emergency_checkpointing) {
             _is_manual_emergency_checkpointing = false;
-            _stub->_manual_emergency_checkpointing_count == 0
-                ? 0
-                : (--_stub->_manual_emergency_checkpointing_count);
+            if (_stub->_manual_emergency_checkpointing_count > 0) {
+                --_stub->_manual_emergency_checkpointing_count;
+            }
         }
 
         return err;
@@ -330,9 +387,9 @@ error_code replica::background_async_checkpoint(bool 
is_emergency)
 
     if (_is_manual_emergency_checkpointing) {
         _is_manual_emergency_checkpointing = false;
-        _stub->_manual_emergency_checkpointing_count == 0
-            ? 0
-            : (--_stub->_manual_emergency_checkpointing_count);
+        if (_stub->_manual_emergency_checkpointing_count > 0) {
+            --_stub->_manual_emergency_checkpointing_count;
+        }
     }
     if (err == ERR_WRONG_TIMING) {
         // do nothing
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index cc631143b..9debd41ed 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -100,6 +100,8 @@ public:
         return manual_compaction_status::IDLE;
     }
 
+    void set_last_applied_decree(decree d) { _last_committed_decree.store(d); }
+
     void set_last_durable_decree(decree d) { _last_durable_decree = d; }
 
     void set_expect_last_durable_decree(decree d) { 
_expect_last_durable_decree = d; }
@@ -218,6 +220,11 @@ public:
         backup_context->complete_checkpoint();
     }
 
+    void update_last_applied_decree(decree decree)
+    {
+        dynamic_cast<mock_replication_app_base 
*>(_app.get())->set_last_applied_decree(decree);
+    }
+
     void update_last_durable_decree(decree decree)
     {
         dynamic_cast<mock_replication_app_base 
*>(_app.get())->set_last_durable_decree(decree);
diff --git a/src/replica/test/replica_test.cpp 
b/src/replica/test/replica_test.cpp
index 7123dd85b..a6b97b7f1 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -18,6 +18,7 @@
 #include <stddef.h>
 #include <stdint.h>
 #include <atomic>
+#include <functional>
 #include <iostream>
 #include <map>
 #include <memory>
@@ -56,6 +57,7 @@
 #include "runtime/rpc/rpc_message.h"
 #include "runtime/task/task_code.h"
 #include "runtime/task/task_tracker.h"
+#include "test_util/test_util.h"
 #include "utils/autoref_ptr.h"
 #include "utils/defer.h"
 #include "utils/env.h"
@@ -65,10 +67,14 @@
 #include "utils/fmt_logging.h"
 #include "utils/metrics.h"
 #include "utils/string_conv.h"
+#include "utils/synchronize.h"
 #include "utils/test_macros.h"
 
 DSN_DECLARE_bool(fd_disabled);
 DSN_DECLARE_string(cold_backup_root);
+DSN_DECLARE_uint32(mutation_2pc_min_replica_count);
+
+using pegasus::AssertEventually;
 
 namespace dsn {
 namespace replication {
@@ -90,6 +96,7 @@ public:
         mock_app_info();
         _mock_replica =
             stub->generate_replica_ptr(_app_info, _pid, 
partition_status::PS_PRIMARY, 1);
+        _mock_replica->init_private_log(_log_dir);
 
         // set FLAGS_cold_backup_root manually.
         // FLAGS_cold_backup_root is set by configuration 
"replication.cold_backup_root",
@@ -204,6 +211,25 @@ public:
 
     bool is_checkpointing() { return 
_mock_replica->_is_manual_emergency_checkpointing; }
 
+    void test_trigger_manual_emergency_checkpoint(const decree 
min_checkpoint_decree,
+                                                  const error_code 
expected_err,
+                                                  std::function<void()> 
callback = {})
+    {
+        dsn::utils::notify_event op_completed;
+        _mock_replica->async_trigger_manual_emergency_checkpoint(
+            min_checkpoint_decree, 0, [&](error_code actual_err) {
+                ASSERT_EQ(expected_err, actual_err);
+
+                if (callback) {
+                    callback();
+                }
+
+                op_completed.notify();
+            });
+
+        op_completed.wait();
+    }
+
     bool has_gpid(gpid &pid) const
     {
         for (const auto &node : stub->_fs_manager.get_dir_nodes()) {
@@ -426,28 +452,50 @@ TEST_P(replica_test, 
test_replica_backup_and_restore_with_specific_path)
 
 TEST_P(replica_test, test_trigger_manual_emergency_checkpoint)
 {
-    ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(100), ERR_OK);
-    ASSERT_TRUE(is_checkpointing());
+    // There is only one replica for the unit test.
+    PRESERVE_FLAG(mutation_2pc_min_replica_count);
+    FLAGS_mutation_2pc_min_replica_count = 1;
+
+    // Initially the mutation log is empty.
+    ASSERT_EQ(0, _mock_replica->last_applied_decree());
+    ASSERT_EQ(0, _mock_replica->last_durable_decree());
+
+    // Commit at least an empty write to make the replica become non-empty.
+    _mock_replica->update_expect_last_durable_decree(1);
+    test_trigger_manual_emergency_checkpoint(1, ERR_OK);
+    _mock_replica->tracker()->wait_outstanding_tasks();
+
+    // Committing multiple empty writes (retry multiple times) might make the 
last
+    // applied decree greater than 1.
+    ASSERT_LE(1, _mock_replica->last_applied_decree());
+    ASSERT_EQ(1, _mock_replica->last_durable_decree());
+
+    test_trigger_manual_emergency_checkpoint(
+        100, ERR_OK, [this]() { ASSERT_TRUE(is_checkpointing()); });
     _mock_replica->update_last_durable_decree(100);
 
-    // test no need start checkpoint because `old_decree` < `last_durable`
-    ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(100), ERR_OK);
-    ASSERT_FALSE(is_checkpointing());
+    // There's no need to trigger checkpoint since min_checkpoint_decree <= 
last_durable_decree.
+    test_trigger_manual_emergency_checkpoint(
+        100, ERR_OK, [this]() { ASSERT_FALSE(is_checkpointing()); });
 
-    // test has existed running task
+    // There's already an existing running manual emergency checkpoint task.
     force_update_checkpointing(true);
-    ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), 
ERR_BUSY);
-    ASSERT_TRUE(is_checkpointing());
-    // test running task completed
+    test_trigger_manual_emergency_checkpoint(
+        101, ERR_BUSY, [this]() { ASSERT_TRUE(is_checkpointing()); });
+
+    // Wait until the running task is completed.
     _mock_replica->tracker()->wait_outstanding_tasks();
     ASSERT_FALSE(is_checkpointing());
 
-    // test exceed max concurrent count
-    ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_OK);
+    // The number of concurrent tasks exceeds the limit.
+    test_trigger_manual_emergency_checkpoint(101, ERR_OK);
     force_update_checkpointing(false);
+
+    PRESERVE_FLAG(max_concurrent_manual_emergency_checkpointing_count);
     FLAGS_max_concurrent_manual_emergency_checkpointing_count = 1;
-    ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), 
ERR_TRY_AGAIN);
-    ASSERT_FALSE(is_checkpointing());
+
+    test_trigger_manual_emergency_checkpoint(
+        101, ERR_TRY_AGAIN, [this]() { ASSERT_FALSE(is_checkpointing()); });
     _mock_replica->tracker()->wait_outstanding_tasks();
 }
 
diff --git a/src/utils/errors.h b/src/utils/errors.h
index c611e1bef..8d5806efa 100644
--- a/src/utils/errors.h
+++ b/src/utils/errors.h
@@ -136,7 +136,7 @@ public:
         return os << s.description();
     }
 
-    friend bool operator==(const error_s lhs, const error_s &rhs)
+    friend bool operator==(const error_s &lhs, const error_s &rhs)
     {
         if (lhs._info && rhs._info) {
             return lhs._info->code == rhs._info->code && lhs._info->msg == 
rhs._info->msg;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to