This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d5621be1f fix(duplication): create checkpoint for the replica with 0
or 1 record (#2054)
d5621be1f is described below
commit d5621be1feb472a9901b6c78c19f9d0e53ef8b4b
Author: Dan Wang <[email protected]>
AuthorDate: Thu Jul 11 14:52:58 2024 +0800
fix(duplication): create checkpoint for the replica with 0 or 1 record
(#2054)
https://github.com/apache/incubator-pegasus/issues/2069
To create the checkpoint of the replica with 0 or 1 record immediately:
- set the min decree for checkpoint to at least 1, which means the
checkpoint
would inevitably be created even if the replica is empty.
- for the empty replica, an empty write would be committed to increase the
decree to at least 1 to ensure that the checkpoint would be created.
- the max decree in rocksdb memtable (the last applied decree) is considered
as the min decree that should be covered by the checkpoint, which means
currently all of the data in current rocksdb should be included into the
created
checkpoint.
The following configuration is added to control the retry interval for
triggering
checkpoint:
```diff
[replication]
+ trigger_checkpoint_retry_interval_ms = 100
```
---
src/replica/duplication/replica_duplicator.cpp | 70 +++++++++++------
src/replica/duplication/replica_duplicator.h | 12 ++-
.../duplication/test/duplication_test_base.h | 7 +-
.../duplication/test/replica_duplicator_test.cpp | 48 +++++++-----
src/replica/replica.h | 24 +++++-
src/replica/replica_chkpt.cpp | 89 ++++++++++++++++++----
src/replica/test/mock_utils.h | 7 ++
src/replica/test/replica_test.cpp | 74 ++++++++++++++----
src/utils/errors.h | 2 +-
9 files changed, 252 insertions(+), 81 deletions(-)
diff --git a/src/replica/duplication/replica_duplicator.cpp
b/src/replica/duplication/replica_duplicator.cpp
index 810209651..31d7e9d94 100644
--- a/src/replica/duplication/replica_duplicator.cpp
+++ b/src/replica/duplication/replica_duplicator.cpp
@@ -35,7 +35,6 @@
#include "load_from_private_log.h"
#include "replica/mutation_log.h"
#include "replica/replica.h"
-#include "runtime/task/async_calls.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
@@ -64,10 +63,31 @@ replica_duplicator::replica_duplicator(const
duplication_entry &ent, replica *r)
auto it = ent.progress.find(get_gpid().get_partition_index());
if (it->second == invalid_decree) {
- // keep current max committed_decree as start point.
- // todo(jiashuo1) _start_point_decree hasn't be ready to persist zk,
so if master restart,
- // the value will be reset 0
- _start_point_decree = _progress.last_decree =
_replica->private_log()->max_commit_on_disk();
+ // Ensure that the checkpoint decree is at least 1. Otherwise, the
checkpoint could not be
+ // created in time for empty replica; in consequence, the remote
cluster would inevitably
+ // fail to pull the checkpoint files.
+ //
+ // The max decree in rocksdb memtable (the last applied decree) is
considered as the min
+ // decree that should be covered by the checkpoint, which means
currently all of the data
+ // in current rocksdb should be included into the created checkpoint.
+ //
+ // TODO(jiashuo1): _min_checkpoint_decree hasn't be ready to persist
zk, so if master
+ // restart, the value will be reset to 0.
+ const auto last_applied_decree = _replica->last_applied_decree();
+ _min_checkpoint_decree = std::max(last_applied_decree,
static_cast<decree>(1));
+ _progress.last_decree = last_applied_decree;
+ LOG_INFO_PREFIX("initialize checkpoint decree:
min_checkpoint_decree={}, "
+ "last_committed_decree={}, last_applied_decree={}, "
+ "last_flushed_decree={}, last_durable_decree={}, "
+ "plog_max_decree_on_disk={},
plog_max_commit_on_disk={}",
+ _min_checkpoint_decree,
+ _replica->last_committed_decree(),
+ last_applied_decree,
+ _replica->last_flushed_decree(),
+ _replica->last_durable_decree(),
+ _replica->private_log()->max_decree_on_disk(),
+ _replica->private_log()->max_commit_on_disk());
+
} else {
_progress.last_decree = _progress.confirmed_decree = it->second;
}
@@ -86,17 +106,19 @@ replica_duplicator::replica_duplicator(const
duplication_entry &ent, replica *r)
void replica_duplicator::prepare_dup()
{
- LOG_INFO_PREFIX("start prepare checkpoint to catch up with latest durable
decree: "
- "start_point_decree({}) < last_durable_decree({}) = {}",
- _start_point_decree,
+ LOG_INFO_PREFIX("start to trigger checkpoint: min_checkpoint_decree={}, "
+ "last_committed_decree={}, last_applied_decree={}, "
+ "last_flushed_decree={}, last_durable_decree={}, "
+ "plog_max_decree_on_disk={}, plog_max_commit_on_disk={}",
+ _min_checkpoint_decree,
+ _replica->last_committed_decree(),
+ _replica->last_applied_decree(),
+ _replica->last_flushed_decree(),
_replica->last_durable_decree(),
- _start_point_decree < _replica->last_durable_decree());
+ _replica->private_log()->max_decree_on_disk(),
+ _replica->private_log()->max_commit_on_disk());
- tasking::enqueue(
- LPC_REPLICATION_COMMON,
- &_tracker,
- [this]() {
_replica->trigger_manual_emergency_checkpoint(_start_point_decree); },
- get_gpid().thread_hash());
+
_replica->async_trigger_manual_emergency_checkpoint(_min_checkpoint_decree, 0);
}
void replica_duplicator::start_dup_log()
@@ -162,19 +184,19 @@ void
replica_duplicator::update_status_if_needed(duplication_status::type next_s
return;
}
- // DS_PREPARE means replica is checkpointing, it may need trigger multi
time to catch
- // _start_point_decree of the plog
+ // DS_PREPARE means this replica is making checkpoint, which might need to
be triggered
+ // multiple times to catch up with _min_checkpoint_decree.
if (_status == next_status && next_status !=
duplication_status::DS_PREPARE) {
return;
}
- LOG_INFO_PREFIX(
- "update duplication status: {}=>{}[start_point={}, last_commit={},
last_durable={}]",
- duplication_status_to_string(_status),
- duplication_status_to_string(next_status),
- _start_point_decree,
- _replica->last_committed_decree(),
- _replica->last_durable_decree());
+ LOG_INFO_PREFIX("update duplication status: {}=>{}
[min_checkpoint_decree={}, "
+ "last_committed_decree={}, last_durable_decree={}]",
+ duplication_status_to_string(_status),
+ duplication_status_to_string(next_status),
+ _min_checkpoint_decree,
+ _replica->last_committed_decree(),
+ _replica->last_durable_decree());
_status = next_status;
if (_status == duplication_status::DS_PREPARE) {
@@ -220,7 +242,7 @@ error_s replica_duplicator::update_progress(const
duplication_progress &p)
decree last_confirmed_decree = _progress.confirmed_decree;
_progress.confirmed_decree = std::max(_progress.confirmed_decree,
p.confirmed_decree);
_progress.last_decree = std::max(_progress.last_decree, p.last_decree);
- _progress.checkpoint_has_prepared = _start_point_decree <=
_replica->last_durable_decree();
+ _progress.checkpoint_has_prepared = _min_checkpoint_decree <=
_replica->last_durable_decree();
if (_progress.confirmed_decree > _progress.last_decree) {
return FMT_ERR(ERR_INVALID_STATE,
diff --git a/src/replica/duplication/replica_duplicator.h
b/src/replica/duplication/replica_duplicator.h
index 66f7ac7ce..9a8deed0d 100644
--- a/src/replica/duplication/replica_duplicator.h
+++ b/src/replica/duplication/replica_duplicator.h
@@ -39,12 +39,13 @@ namespace replication {
class duplication_progress
{
public:
- // check if checkpoint has catch up with `_start_point_decree`
+ // Check if checkpoint has covered `_min_checkpoint_decree`.
bool checkpoint_has_prepared{false};
- // the maximum decree that's been persisted in meta server
+
+ // The max decree that has been persisted in the meta server.
decree confirmed_decree{invalid_decree};
- // the maximum decree that's been duplicated to remote.
+ // The max decree that has been duplicated to the remote cluster.
decree last_decree{invalid_decree};
duplication_progress &set_last_decree(decree d)
@@ -184,7 +185,10 @@ private:
replica_stub *_stub;
dsn::task_tracker _tracker;
- decree _start_point_decree = invalid_decree;
+ // The min decree that should be covered by the checkpoint which is
triggered by the
+ // newly added duplication.
+ decree _min_checkpoint_decree{invalid_decree};
+
duplication_status::type _status{duplication_status::DS_INIT};
std::atomic<duplication_fail_mode::type>
_fail_mode{duplication_fail_mode::FAIL_SLOW};
diff --git a/src/replica/duplication/test/duplication_test_base.h
b/src/replica/duplication/test/duplication_test_base.h
index 69d935cc1..cd54fe9d8 100644
--- a/src/replica/duplication/test/duplication_test_base.h
+++ b/src/replica/duplication/test/duplication_test_base.h
@@ -54,17 +54,16 @@ public:
return dup_entities[dupid].get();
}
- std::unique_ptr<replica_duplicator> create_test_duplicator(decree
confirmed = invalid_decree,
- decree start =
invalid_decree)
+ std::unique_ptr<replica_duplicator>
+ create_test_duplicator(decree confirmed_decree = invalid_decree)
{
duplication_entry dup_ent;
dup_ent.dupid = 1;
dup_ent.remote = "remote_address";
dup_ent.status = duplication_status::DS_PAUSE;
- dup_ent.progress[_replica->get_gpid().get_partition_index()] =
confirmed;
+ dup_ent.progress[_replica->get_gpid().get_partition_index()] =
confirmed_decree;
auto duplicator = std::make_unique<replica_duplicator>(dup_ent,
_replica.get());
- duplicator->_start_point_decree = start;
return duplicator;
}
diff --git a/src/replica/duplication/test/replica_duplicator_test.cpp
b/src/replica/duplication/test/replica_duplicator_test.cpp
index 817e3090f..78e1aabfb 100644
--- a/src/replica/duplication/test/replica_duplicator_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_test.cpp
@@ -64,9 +64,9 @@ public:
decree last_durable_decree() const { return
_replica->last_durable_decree(); }
- decree log_dup_start_decree(const std::unique_ptr<replica_duplicator>
&dup) const
+ decree min_checkpoint_decree(const std::unique_ptr<replica_duplicator>
&dup) const
{
- return dup->_start_point_decree;
+ return dup->_min_checkpoint_decree;
}
void test_new_duplicator(const std::string &remote_app_name, bool
specify_remote_app_name)
@@ -157,39 +157,51 @@ TEST_P(replica_duplicator_test, pause_start_duplication)
{ test_pause_start_dupl
TEST_P(replica_duplicator_test, duplication_progress)
{
auto duplicator = create_test_duplicator();
- ASSERT_EQ(duplicator->progress().last_decree, 0); // start duplication
from empty plog
- ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);
+ // Start duplication from empty replica.
+ ASSERT_EQ(1, min_checkpoint_decree(duplicator));
+ ASSERT_EQ(0, duplicator->progress().last_decree);
+ ASSERT_EQ(invalid_decree, duplicator->progress().confirmed_decree);
+
+ // Update the max decree that has been duplicated to the remote cluster.
duplicator->update_progress(duplicator->progress().set_last_decree(10));
- ASSERT_EQ(duplicator->progress().last_decree, 10);
- ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);
+ ASSERT_EQ(10, duplicator->progress().last_decree);
+ ASSERT_EQ(invalid_decree, duplicator->progress().confirmed_decree);
+ // Update the max decree that has been persisted in the meta server.
duplicator->update_progress(duplicator->progress().set_confirmed_decree(10));
- ASSERT_EQ(duplicator->progress().confirmed_decree, 10);
- ASSERT_EQ(duplicator->progress().last_decree, 10);
+ ASSERT_EQ(10, duplicator->progress().last_decree);
+ ASSERT_EQ(10, duplicator->progress().confirmed_decree);
-
ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(1)),
- error_s::make(ERR_INVALID_STATE, "never decrease
confirmed_decree: new(1) old(10)"));
+ ASSERT_EQ(error_s::make(ERR_INVALID_STATE, "never decrease
confirmed_decree: new(1) old(10)"),
+
duplicator->update_progress(duplicator->progress().set_confirmed_decree(1)));
-
ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(12)),
- error_s::make(ERR_INVALID_STATE,
- "last_decree(10) should always larger than
confirmed_decree(12)"));
+ ASSERT_EQ(error_s::make(ERR_INVALID_STATE,
+ "last_decree(10) should always larger than
confirmed_decree(12)"),
+
duplicator->update_progress(duplicator->progress().set_confirmed_decree(12)));
- auto duplicator_for_checkpoint = create_test_duplicator(invalid_decree,
100);
+ // Test that the checkpoint has not been created.
+ replica()->update_last_applied_decree(100);
+ auto duplicator_for_checkpoint = create_test_duplicator();
ASSERT_FALSE(duplicator_for_checkpoint->progress().checkpoint_has_prepared);
- replica()->update_last_durable_decree(101);
+ // Test that the checkpoint has been created.
+ replica()->update_last_durable_decree(100);
duplicator_for_checkpoint->update_progress(duplicator->progress());
ASSERT_TRUE(duplicator_for_checkpoint->progress().checkpoint_has_prepared);
}
-TEST_P(replica_duplicator_test, prapre_dup)
+TEST_P(replica_duplicator_test, prepare_dup)
{
- auto duplicator = create_test_duplicator(invalid_decree, 100);
+ replica()->update_last_applied_decree(100);
replica()->update_expect_last_durable_decree(100);
+
+ auto duplicator = create_test_duplicator();
duplicator->prepare_dup();
wait_all(duplicator);
- ASSERT_EQ(last_durable_decree(), log_dup_start_decree(duplicator));
+
+ ASSERT_EQ(100, min_checkpoint_decree(duplicator));
+ ASSERT_EQ(100, last_durable_decree());
}
} // namespace replication
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 12e505dcd..802d07ef3 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -30,6 +30,7 @@
#include <stddef.h>
#include <stdint.h>
#include <atomic>
+#include <functional>
#include <map>
#include <memory>
#include <string>
@@ -268,7 +269,24 @@ public:
//
// Duplication
//
- error_code trigger_manual_emergency_checkpoint(decree old_decree);
+
+ using trigger_checkpoint_callback = std::function<void(error_code)>;
+
+ // Choose a fixed thread from pool to trigger an emergency checkpoint
asynchronously.
+ // A new checkpoint would still be created even if the replica is empty
(hasn't received
+ // any write operation).
+ //
+ // Parameters:
+ // - `min_checkpoint_decree`: the min decree that should be covered by the
triggered
+ // checkpoint. Should be a number greater than 0 which means a new
checkpoint must be
+ // created.
+ // - `delay_ms`: the delayed time in milliseconds that the triggering task
is put into
+ // the thread pool.
+ // - `callback`: the callback processor handling the error code of
triggering checkpoint.
+ void async_trigger_manual_emergency_checkpoint(decree
min_checkpoint_decree,
+ uint32_t delay_ms,
+ trigger_checkpoint_callback
callback = {});
+
void on_query_last_checkpoint(learn_response &response);
std::shared_ptr<replica_duplicator_manager> get_duplication_manager() const
{
@@ -471,6 +489,10 @@ private:
bool is_plog_gc_enabled() const;
std::string get_plog_gc_enabled_message() const;
+ // Trigger an emergency checkpoint for duplication. Once the replica is
empty (hasn't
+ // received any write operation), there would be no checkpoint created.
+ error_code trigger_manual_emergency_checkpoint(decree
min_checkpoint_decree);
+
/////////////////////////////////////////////////////////////////
// cold backup
virtual void generate_backup_checkpoint(cold_backup_context_ptr
backup_context);
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 0145dcab0..ea8ff41c3 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -42,6 +42,7 @@
#include "metadata_types.h"
#include "mutation_log.h"
#include "replica.h"
+#include "replica/mutation.h"
#include "replica/prepare_list.h"
#include "replica/replica_context.h"
#include "replica/replication_app_base.h"
@@ -69,12 +70,14 @@ DSN_DEFINE_int32(replication,
checkpoint_max_interval_hours,
2,
"The maximum time interval in hours of replica checkpoints
must be generated");
+
DSN_DEFINE_int32(replication,
log_private_reserve_max_size_mb,
1000,
"The maximum size of useless private log to be reserved.
NOTE: only when "
"'log_private_reserve_max_size_mb' and
'log_private_reserve_max_time_seconds' are "
"both satisfied, the useless logs can be reserved");
+
DSN_DEFINE_int32(
replication,
log_private_reserve_max_time_seconds,
@@ -83,6 +86,11 @@ DSN_DEFINE_int32(
"when 'log_private_reserve_max_size_mb' and
'log_private_reserve_max_time_seconds' "
"are both satisfied, the useless logs can be reserved");
+DSN_DEFINE_uint32(replication,
+ trigger_checkpoint_retry_interval_ms,
+ 100,
+ "The wait interval before next attempt for empty write.");
+
namespace dsn {
namespace replication {
@@ -186,8 +194,59 @@ void replica::on_checkpoint_timer()
});
}
+void replica::async_trigger_manual_emergency_checkpoint(decree
min_checkpoint_decree,
+ uint32_t delay_ms,
+
trigger_checkpoint_callback callback)
+{
+ CHECK_GT_PREFIX_MSG(min_checkpoint_decree,
+ 0,
+ "min_checkpoint_decree should be a number greater than
0 "
+ "which means a new checkpoint must be created");
+
+ tasking::enqueue(
+ LPC_REPLICATION_COMMON,
+ &_tracker,
+ [min_checkpoint_decree, callback, this]() {
+ _checker.only_one_thread_access();
+
+ if (_app == nullptr) {
+ LOG_ERROR_PREFIX("app hasn't been initialized or has been
released");
+ return;
+ }
+
+ const auto last_applied_decree = this->last_applied_decree();
+ if (last_applied_decree == 0) {
+ LOG_INFO_PREFIX("ready to commit an empty write to trigger
checkpoint: "
+ "min_checkpoint_decree={},
last_applied_decree={}, "
+ "last_durable_decree={}",
+ min_checkpoint_decree,
+ last_applied_decree,
+ last_durable_decree());
+
+ // For the empty replica, here we commit an empty write would
be to increase
+ // the decree to at least 1, to ensure that the checkpoint
would inevitably
+ // be created even if the replica is empty.
+ mutation_ptr mu = new_mutation(invalid_decree);
+ mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr);
+ init_prepare(mu, false);
+
+ async_trigger_manual_emergency_checkpoint(
+ min_checkpoint_decree,
FLAGS_trigger_checkpoint_retry_interval_ms, callback);
+
+ return;
+ }
+
+ const auto err =
trigger_manual_emergency_checkpoint(min_checkpoint_decree);
+ if (callback) {
+ callback(err);
+ }
+ },
+ get_gpid().thread_hash(),
+ std::chrono::milliseconds(delay_ms));
+}
+
// ThreadPool: THREAD_POOL_REPLICATION
-error_code replica::trigger_manual_emergency_checkpoint(decree old_decree)
+error_code replica::trigger_manual_emergency_checkpoint(decree
min_checkpoint_decree)
{
_checker.only_one_thread_access();
@@ -196,20 +255,18 @@ error_code
replica::trigger_manual_emergency_checkpoint(decree old_decree)
return ERR_LOCAL_APP_FAILURE;
}
- if (old_decree <= _app->last_durable_decree()) {
- LOG_INFO_PREFIX("checkpoint has been completed: old = {} vs latest =
{}",
- old_decree,
- _app->last_durable_decree());
+ const auto last_durable_decree = this->last_durable_decree();
+ if (min_checkpoint_decree <= last_durable_decree) {
+ LOG_INFO_PREFIX(
+ "checkpoint has been completed: min_checkpoint_decree={},
last_durable_decree={}",
+ min_checkpoint_decree,
+ last_durable_decree);
_is_manual_emergency_checkpointing = false;
- _stub->_manual_emergency_checkpointing_count == 0
- ? 0
- : (--_stub->_manual_emergency_checkpointing_count);
return ERR_OK;
}
if (_is_manual_emergency_checkpointing) {
- LOG_WARNING_PREFIX("replica is checkpointing, last_durable_decree =
{}",
- _app->last_durable_decree());
+ LOG_WARNING_PREFIX("replica is checkpointing, last_durable_decree={}",
last_durable_decree);
return ERR_BUSY;
}
@@ -307,9 +364,9 @@ error_code replica::background_async_checkpoint(bool
is_emergency)
if (_is_manual_emergency_checkpointing) {
_is_manual_emergency_checkpointing = false;
- _stub->_manual_emergency_checkpointing_count == 0
- ? 0
- : (--_stub->_manual_emergency_checkpointing_count);
+ if (_stub->_manual_emergency_checkpointing_count > 0) {
+ --_stub->_manual_emergency_checkpointing_count;
+ }
}
return err;
@@ -330,9 +387,9 @@ error_code replica::background_async_checkpoint(bool
is_emergency)
if (_is_manual_emergency_checkpointing) {
_is_manual_emergency_checkpointing = false;
- _stub->_manual_emergency_checkpointing_count == 0
- ? 0
- : (--_stub->_manual_emergency_checkpointing_count);
+ if (_stub->_manual_emergency_checkpointing_count > 0) {
+ --_stub->_manual_emergency_checkpointing_count;
+ }
}
if (err == ERR_WRONG_TIMING) {
// do nothing
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index cc631143b..9debd41ed 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -100,6 +100,8 @@ public:
return manual_compaction_status::IDLE;
}
+ void set_last_applied_decree(decree d) { _last_committed_decree.store(d); }
+
void set_last_durable_decree(decree d) { _last_durable_decree = d; }
void set_expect_last_durable_decree(decree d) {
_expect_last_durable_decree = d; }
@@ -218,6 +220,11 @@ public:
backup_context->complete_checkpoint();
}
+ void update_last_applied_decree(decree decree)
+ {
+ dynamic_cast<mock_replication_app_base
*>(_app.get())->set_last_applied_decree(decree);
+ }
+
void update_last_durable_decree(decree decree)
{
dynamic_cast<mock_replication_app_base
*>(_app.get())->set_last_durable_decree(decree);
diff --git a/src/replica/test/replica_test.cpp
b/src/replica/test/replica_test.cpp
index 7123dd85b..a6b97b7f1 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -18,6 +18,7 @@
#include <stddef.h>
#include <stdint.h>
#include <atomic>
+#include <functional>
#include <iostream>
#include <map>
#include <memory>
@@ -56,6 +57,7 @@
#include "runtime/rpc/rpc_message.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_tracker.h"
+#include "test_util/test_util.h"
#include "utils/autoref_ptr.h"
#include "utils/defer.h"
#include "utils/env.h"
@@ -65,10 +67,14 @@
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
#include "utils/string_conv.h"
+#include "utils/synchronize.h"
#include "utils/test_macros.h"
DSN_DECLARE_bool(fd_disabled);
DSN_DECLARE_string(cold_backup_root);
+DSN_DECLARE_uint32(mutation_2pc_min_replica_count);
+
+using pegasus::AssertEventually;
namespace dsn {
namespace replication {
@@ -90,6 +96,7 @@ public:
mock_app_info();
_mock_replica =
stub->generate_replica_ptr(_app_info, _pid,
partition_status::PS_PRIMARY, 1);
+ _mock_replica->init_private_log(_log_dir);
// set FLAGS_cold_backup_root manually.
// FLAGS_cold_backup_root is set by configuration
"replication.cold_backup_root",
@@ -204,6 +211,25 @@ public:
bool is_checkpointing() { return
_mock_replica->_is_manual_emergency_checkpointing; }
+ void test_trigger_manual_emergency_checkpoint(const decree
min_checkpoint_decree,
+ const error_code
expected_err,
+ std::function<void()>
callback = {})
+ {
+ dsn::utils::notify_event op_completed;
+ _mock_replica->async_trigger_manual_emergency_checkpoint(
+ min_checkpoint_decree, 0, [&](error_code actual_err) {
+ ASSERT_EQ(expected_err, actual_err);
+
+ if (callback) {
+ callback();
+ }
+
+ op_completed.notify();
+ });
+
+ op_completed.wait();
+ }
+
bool has_gpid(gpid &pid) const
{
for (const auto &node : stub->_fs_manager.get_dir_nodes()) {
@@ -426,28 +452,50 @@ TEST_P(replica_test,
test_replica_backup_and_restore_with_specific_path)
TEST_P(replica_test, test_trigger_manual_emergency_checkpoint)
{
- ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(100), ERR_OK);
- ASSERT_TRUE(is_checkpointing());
+ // There is only one replica for the unit test.
+ PRESERVE_FLAG(mutation_2pc_min_replica_count);
+ FLAGS_mutation_2pc_min_replica_count = 1;
+
+ // Initially the mutation log is empty.
+ ASSERT_EQ(0, _mock_replica->last_applied_decree());
+ ASSERT_EQ(0, _mock_replica->last_durable_decree());
+
+ // Commit at least an empty write to make the replica become non-empty.
+ _mock_replica->update_expect_last_durable_decree(1);
+ test_trigger_manual_emergency_checkpoint(1, ERR_OK);
+ _mock_replica->tracker()->wait_outstanding_tasks();
+
+ // Committing multiple empty writes (retry multiple times) might make the
last
+ // applied decree greater than 1.
+ ASSERT_LE(1, _mock_replica->last_applied_decree());
+ ASSERT_EQ(1, _mock_replica->last_durable_decree());
+
+ test_trigger_manual_emergency_checkpoint(
+ 100, ERR_OK, [this]() { ASSERT_TRUE(is_checkpointing()); });
_mock_replica->update_last_durable_decree(100);
- // test no need start checkpoint because `old_decree` < `last_durable`
- ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(100), ERR_OK);
- ASSERT_FALSE(is_checkpointing());
+ // There's no need to trigger checkpoint since min_checkpoint_decree <=
last_durable_decree.
+ test_trigger_manual_emergency_checkpoint(
+ 100, ERR_OK, [this]() { ASSERT_FALSE(is_checkpointing()); });
- // test has existed running task
+ // There's already an existing running manual emergency checkpoint task.
force_update_checkpointing(true);
- ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101),
ERR_BUSY);
- ASSERT_TRUE(is_checkpointing());
- // test running task completed
+ test_trigger_manual_emergency_checkpoint(
+ 101, ERR_BUSY, [this]() { ASSERT_TRUE(is_checkpointing()); });
+
+ // Wait until the running task is completed.
_mock_replica->tracker()->wait_outstanding_tasks();
ASSERT_FALSE(is_checkpointing());
- // test exceed max concurrent count
- ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_OK);
+ // The number of concurrent tasks exceeds the limit.
+ test_trigger_manual_emergency_checkpoint(101, ERR_OK);
force_update_checkpointing(false);
+
+ PRESERVE_FLAG(max_concurrent_manual_emergency_checkpointing_count);
FLAGS_max_concurrent_manual_emergency_checkpointing_count = 1;
- ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101),
ERR_TRY_AGAIN);
- ASSERT_FALSE(is_checkpointing());
+
+ test_trigger_manual_emergency_checkpoint(
+ 101, ERR_TRY_AGAIN, [this]() { ASSERT_FALSE(is_checkpointing()); });
_mock_replica->tracker()->wait_outstanding_tasks();
}
diff --git a/src/utils/errors.h b/src/utils/errors.h
index c611e1bef..8d5806efa 100644
--- a/src/utils/errors.h
+++ b/src/utils/errors.h
@@ -136,7 +136,7 @@ public:
return os << s.description();
}
- friend bool operator==(const error_s lhs, const error_s &rhs)
+ friend bool operator==(const error_s &lhs, const error_s &rhs)
{
if (lhs._info && rhs._info) {
return lhs._info->code == rhs._info->code && lhs._info->msg ==
rhs._info->msg;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]