This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d711b08d1 perf: enhance the loading process of replicas particularly
when a significant number of replicas are spread across multiple disks (#2078)
d711b08d1 is described below
commit d711b08d1a15f7823fcea0c82bdbc6d45e4bf3ce
Author: Dan Wang <[email protected]>
AuthorDate: Thu Dec 26 14:35:30 2024 +0800
perf: enhance the loading process of replicas particularly when a
significant number of replicas are spread across multiple disks (#2078)
Immediately after the replica server is started, all of the replicas under
the data
directory would be loaded. Currently, a loading task is launched for each
replica
directory. The tasks for loading replica directories are pushed into a
partitioned
thread pool (namely `THREAD_POOL_REPLICATION`) disk by disk: only after all
of the replica directories on the current disk have been pushed into the
thread
pool would the process move on to the next disk. Since the thread pool is
partitioned
while the hash for each task is the total number of the tasks that have
been in the
pool before this task is added, all of the replica directories on one disk
would be
executed concurrently. This would lead to two problems once there are a
great number
of replica directories on each disk:
- I/O usage for each disk might become saturated: its `%util` might become
100%;
- The entire loading process is blocked on each single disk: during a long
period only
one disk is keeping busy while others are idle.
The replica server seems getting stuck in loading replicas after it is
started. This is
unacceptable and should be changed.
The improved version allows the replica directories on different disks to
be loaded
simultaneously: every disk would be busy loading replicas. Also, loading
tasks would
be pushed into a non-partitioned thread pool (i.e. `THREAD_POOL_LOCAL_APP`)
instead of the partitioned, making tasks across multiple threads
auto-balanced to
prevent some threads from being starved while others are stuffed.
And new parameter is added to restrict the max number of replicas allowed
to be loaded
simultaneously for each disk, in case that I/O usage for each disk becomes
saturated.
Another parameter is added to ensure that the main thread waiting all
loading tasks to
finished would not be blocked on one task too long while the number of
tasks for loading
replica directories simultaneously on a single disk has reached its limit.
Parameters are added as follows:
```diff
[replication]
+ max_replicas_on_load_for_each_disk = 256
+ load_replica_max_wait_time_ms = 10
```
---
.clang-tidy | 2 +-
build_tools/clang_tidy.py | 4 +-
src/common/replication.codes.h | 2 +-
src/meta/meta_service.cpp | 1 +
src/replica/replica.h | 1 +
src/replica/replica_stub.cpp | 681 +++++++++++++++++++++++---------
src/replica/replica_stub.h | 105 +++--
src/replica/test/config-test.ini | 8 +-
src/replica/test/load_replicas_test.cpp | 339 ++++++++++++++++
src/replica/test/replica_dir_test.cpp | 127 ++++++
src/server/result_writer.cpp | 1 -
src/test_util/test_util.h | 9 +-
src/utils/autoref_ptr.h | 9 +
src/utils/command_manager.h | 42 +-
src/utils/time_utils.h | 4 +-
15 files changed, 1100 insertions(+), 235 deletions(-)
diff --git a/.clang-tidy b/.clang-tidy
index 914dee119..95dd7616d 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -20,7 +20,7 @@
CheckOptions: []
# Disable some checks that are not useful for us now.
# They are sorted by names, and should be consistent to
build_tools/clang_tidy.py.
-Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines
[...]
+Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines
[...]
ExtraArgs:
ExtraArgsBefore: []
FormatStyle: none
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index 09ea434b1..8a35e1a5d 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -88,6 +88,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
"-misc-definitions-in-headers,"
"-misc-non-private-member-variables-in-classes,"
"-misc-unused-parameters,"
+ "-modernize-avoid-bind,"
"-modernize-avoid-c-arrays,"
"-modernize-replace-disallow-copy-and-assign-macro,"
"-modernize-use-trailing-return-type,"
@@ -95,7 +96,8 @@ def run_tidy(sha="HEAD", is_rev_range=False):
"-readability-function-cognitive-complexity,"
"-readability-identifier-length,"
"-readability-magic-numbers,"
- "-readability-named-parameter",
+ "-readability-named-parameter,"
+ "-readability-suspicious-call-argument",
"-extra-arg=-language=c++",
"-extra-arg=-std=c++17",
"-extra-arg=-Ithirdparty/output/include"]
diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h
index 7bbb775b1..42ae695f7 100644
--- a/src/common/replication.codes.h
+++ b/src/common/replication.codes.h
@@ -141,7 +141,6 @@ MAKE_EVENT_CODE(LPC_META_STATE_NORMAL, TASK_PRIORITY_COMMON)
// THREAD_POOL_REPLICATION
#define CURRENT_THREAD_POOL THREAD_POOL_REPLICATION
-MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(RPC_REPLICATION_WRITE_EMPTY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PER_REPLICA_CHECKPOINT_TIMER, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PER_REPLICA_COLLECT_INFO_TIMER, TASK_PRIORITY_COMMON)
@@ -186,6 +185,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH)
// THREAD_POOL_LOCAL_APP
#define CURRENT_THREAD_POOL THREAD_POOL_LOCAL_APP
+MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_WRITE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_read_THROTTLING_DELAY, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index bd9736304..6ec19c007 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -29,6 +29,7 @@
#include <boost/lexical_cast.hpp>
#include <algorithm> // for std::remove_if
#include <chrono>
+#include <cstdint>
#include <functional>
#include <ostream>
#include <string_view>
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 4ee215c3d..b312865c5 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -610,6 +610,7 @@ private:
friend class replica_disk_test;
friend class replica_disk_migrate_test;
friend class open_replica_test;
+ friend class mock_load_replica;
friend class replica_follower;
friend class ::pegasus::server::pegasus_server_test_base;
friend class ::pegasus::server::rocksdb_wrapper_test;
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 2e4eec66e..471b8b1f8 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -29,15 +29,15 @@
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
#include <fmt/format.h>
+#include <nlohmann/json.hpp>
#include <rapidjson/ostreamwrapper.h>
-#include <stdio.h>
-#include <stdlib.h>
#include <algorithm>
#include <chrono>
#include <cstdint>
-#include <deque>
+#include <cstdlib>
#include <iterator>
#include <mutex>
+#include <queue>
#include <set>
#include <sstream>
#include <string_view>
@@ -71,6 +71,10 @@
#include "security/access_controller.h"
#include "split/replica_split_manager.h"
#include "task/async_calls.h"
+#include "task/task.h"
+#include "task/task_engine.h"
+#include "task/task_worker.h"
+#include "utils/api_utilities.h"
#include "utils/command_manager.h"
#include "utils/env.h"
#include "utils/errors.h"
@@ -80,8 +84,11 @@
#include "utils/ports.h"
#include "utils/process_utils.h"
#include "utils/rand.h"
+#include "utils/string_conv.h"
#include "utils/strings.h"
#include "utils/synchronize.h"
+#include "utils/threadpool_spec.h"
+#include "utils/timer.h"
#ifdef DSN_ENABLE_GPERF
#include <gperftools/malloc_extension.h>
#elif defined(DSN_USE_JEMALLOC)
@@ -91,14 +98,17 @@
#include "remote_cmd/remote_command.h"
#include "utils/fail_point.h"
-static const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
- "The maximum concurrent bulk load downloading replica count";
-DSN_DEFINE_int32(replication,
- max_concurrent_bulk_load_downloading_count,
- 5,
- kMaxConcurrentBulkLoadDownloadingCountDesc);
-DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count,
- [](int32_t value) -> bool { return value >= 0; });
+namespace {
+
+const char *kMaxReplicasOnLoadForEachDiskDesc =
+ "The max number of replicas that are allowed to be loaded simultaneously
for each disk dir.";
+
+const char *kLoadReplicaMaxWaitTimeMsDesc = "The max waiting time for replica
loading to complete.";
+
+const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
+ "The maximum concurrent bulk load downloading replica count.";
+
+} // anonymous namespace
METRIC_DEFINE_gauge_int64(server,
total_replicas,
@@ -260,6 +270,26 @@ DSN_DECLARE_string(data_dirs);
DSN_DECLARE_string(encryption_cluster_key_name);
DSN_DECLARE_string(server_key);
+DSN_DEFINE_uint64(replication,
+ max_replicas_on_load_for_each_disk,
+ 256,
+ kMaxReplicasOnLoadForEachDiskDesc);
+DSN_TAG_VARIABLE(max_replicas_on_load_for_each_disk, FT_MUTABLE);
+DSN_DEFINE_validator(max_replicas_on_load_for_each_disk,
+ [](uint64_t value) -> bool { return value > 0; });
+
+DSN_DEFINE_uint64(replication, load_replica_max_wait_time_ms, 10,
kLoadReplicaMaxWaitTimeMsDesc);
+DSN_TAG_VARIABLE(load_replica_max_wait_time_ms, FT_MUTABLE);
+DSN_DEFINE_validator(load_replica_max_wait_time_ms,
+ [](uint64_t value) -> bool { return value > 0; });
+
+DSN_DEFINE_int32(replication,
+ max_concurrent_bulk_load_downloading_count,
+ 5,
+ kMaxConcurrentBulkLoadDownloadingCountDesc);
+DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count,
+ [](int32_t value) -> bool { return value >= 0; });
+
DSN_DEFINE_bool(replication,
deny_client_on_start,
false,
@@ -376,9 +406,52 @@ namespace dsn {
namespace replication {
bool replica_stub::s_not_exit_on_log_failure = false;
+namespace {
+
+// Register commands that get/set flag configurations.
+void register_flags_ctrl_command()
+{
+ // For the reaonse why using std::call_once please see comments in
+ // replica_stub::register_ctrl_command() for details.
+ static std::once_flag flag;
+ std::call_once(flag, []() mutable {
+ dsn::command_manager::instance().add_global_cmd(
+ dsn::command_manager::instance().register_int_command(
+ FLAGS_max_replicas_on_load_for_each_disk,
+ FLAGS_max_replicas_on_load_for_each_disk,
+ "replica.max-replicas-on-load-for-each-disk",
+ kMaxReplicasOnLoadForEachDiskDesc));
+
+ dsn::command_manager::instance().add_global_cmd(
+ dsn::command_manager::instance().register_int_command(
+ FLAGS_load_replica_max_wait_time_ms,
+ FLAGS_load_replica_max_wait_time_ms,
+ "replica.load-replica-max-wait-time-ms",
+ kLoadReplicaMaxWaitTimeMsDesc));
+
+ dsn::command_manager::instance().add_global_cmd(
+ dsn::command_manager::instance().register_bool_command(
+ FLAGS_empty_write_disabled,
+ "replica.disable-empty-write",
+ "whether to disable empty writes"));
+
+ dsn::command_manager::instance().add_global_cmd(
+ dsn::command_manager::instance().register_int_command(
+ FLAGS_max_concurrent_bulk_load_downloading_count,
+ FLAGS_max_concurrent_bulk_load_downloading_count,
+ "replica.max-concurrent-bulk-load-downloading-count",
+ kMaxConcurrentBulkLoadDownloadingCountDesc));
+ });
+}
+
+} // anonymous namespace
+
replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
bool is_long_subscriber /* = true*/)
: serverlet("replica_stub"),
+ _state(NS_Disconnected),
+ _replica_state_subscriber(std::move(subscriber)),
+ _is_long_subscriber(is_long_subscriber),
_deny_client(false),
_verbose_client_log(false),
_verbose_commit_log(false),
@@ -388,6 +461,9 @@ replica_stub::replica_stub(replica_state_subscriber
subscriber /*= nullptr*/,
_bulk_load_downloading_count(0),
_manual_emergency_checkpointing_count(0),
_is_running(false),
+#ifdef DSN_ENABLE_GPERF
+ _is_releasing_memory(false),
+#endif
METRIC_VAR_INIT_server(total_replicas),
METRIC_VAR_INIT_server(opening_replicas),
METRIC_VAR_INIT_server(closing_replicas),
@@ -420,16 +496,13 @@ replica_stub::replica_stub(replica_state_subscriber
subscriber /*= nullptr*/,
METRIC_VAR_INIT_server(splitting_replicas_async_learn_max_duration_ms),
METRIC_VAR_INIT_server(splitting_replicas_max_copy_file_bytes)
{
-#ifdef DSN_ENABLE_GPERF
- _is_releasing_memory = false;
-#endif
- _replica_state_subscriber = subscriber;
- _is_long_subscriber = is_long_subscriber;
- _failure_detector = nullptr;
- _state = NS_Disconnected;
+ // Some flags might need to be tuned on the stage of loading replicas
(during
+ // replica_stub::initialize()), thus register their control command just
in the
+ // constructor.
+ register_flags_ctrl_command();
}
-replica_stub::~replica_stub(void) { close(); }
+replica_stub::~replica_stub() { close(); }
void replica_stub::initialize(bool clear /* = false*/)
{
@@ -439,6 +512,246 @@ void replica_stub::initialize(bool clear /* = false*/)
_access_controller = std::make_unique<dsn::security::access_controller>();
}
+std::vector<replica_stub::disk_replicas_info>
replica_stub::get_all_disk_dirs() const
+{
+ std::vector<disk_replicas_info> disks;
+ for (const auto &disk_node : _fs_manager.get_dir_nodes()) {
+ if (dsn_unlikely(disk_node->status == disk_status::IO_ERROR)) {
+ // Skip disks with IO errors.
+ continue;
+ }
+
+ std::vector<std::string> sub_dirs;
+ CHECK(utils::filesystem::get_subdirectories(disk_node->full_dir,
sub_dirs, false),
+ "failed to get sub_directories in {}",
+ disk_node->full_dir);
+ disks.push_back(disk_replicas_info{disk_node.get(),
std::move(sub_dirs)});
+ }
+
+ return disks;
+}
+
+// TaskCode: LPC_REPLICATION_INIT_LOAD
+// ThreadPool: THREAD_POOL_LOCAL_APP
+void replica_stub::load_replica(dir_node *disk_node,
+ const std::string &replica_dir,
+ size_t total_dir_count,
+ utils::ex_lock &reps_lock,
+ replica_map_by_gpid &reps,
+ std::atomic<size_t> &finished_dir_count)
+{
+ // Measure execution time for loading a replica dir.
+ //
+ // TODO(wangdan): support decimal milliseconds or microseconds, since
loading a small
+ // replica tends to spend less than 1 milliseconds and show "0ms" in
logging.
+ SCOPED_LOG_TIMING(INFO, "on loading replica dir {}:{}", disk_node->tag,
replica_dir);
+
+ LOG_INFO("loading replica: replica_dir={}:{}", disk_node->tag,
replica_dir);
+
+ const auto *const worker = task::get_current_worker2();
+ if (worker != nullptr) {
+ CHECK(!(worker->pool()->spec().partitioned),
+ "The thread pool THREAD_POOL_LOCAL_APP(task code:
LPC_REPLICATION_INIT_LOAD) "
+ "for loading replicas must not be partitioned since load
balancing is required "
+ "among multiple threads");
+ }
+
+ auto rep = load_replica(disk_node, replica_dir);
+ if (rep == nullptr) {
+ LOG_INFO("load replica failed: replica_dir={}:{}, progress={}/{}",
+ disk_node->tag,
+ replica_dir,
+ ++finished_dir_count,
+ total_dir_count);
+ return;
+ }
+
+ LOG_INFO("{}@{}: load replica successfully, replica_dir={}:{},
progress={}/{}, "
+ "last_durable_decree={}, last_committed_decree={},
last_prepared_decree={}",
+ rep->get_gpid(),
+ dsn_primary_host_port(),
+ disk_node->tag,
+ replica_dir,
+ ++finished_dir_count,
+ total_dir_count,
+ rep->last_durable_decree(),
+ rep->last_committed_decree(),
+ rep->last_prepared_decree());
+
+ utils::auto_lock<utils::ex_lock> l(reps_lock);
+ const auto rep_iter = reps.find(rep->get_gpid());
+ CHECK(rep_iter == reps.end(),
+ "{}@{}: newly loaded dir {} conflicts with existing {} while loading
replica",
+ rep->get_gpid(),
+ dsn_primary_host_port(),
+ rep->dir(),
+ rep_iter->second->dir());
+
+ reps.emplace(rep->get_gpid(), rep);
+}
+
+void replica_stub::load_replicas(replica_map_by_gpid &reps)
+{
+ // Measure execution time for loading all replicas from all healthy disks
without IO errors.
+ //
+ // TODO(wangdan): show both the size of output replicas and execution time
on just one
+ // logging line.
+ SCOPED_LOG_TIMING(INFO, "on loading replicas");
+
+ const auto &disks = get_all_disk_dirs();
+
+ // The max index of dirs that are currently being loaded for each disk,
which means the dirs
+ // with higher indexes have not begun to be loaded (namely pushed into the
queue).
+ std::vector<size_t> replica_dir_indexes(disks.size(), 0);
+
+ // Each loader is for a replica dir, including its path and loading task.
+ struct replica_dir_loader
+ {
+ size_t replica_dir_index;
+ std::string replica_dir_path;
+ task_ptr load_replica_task;
+ };
+
+ // Each queue would cache the tasks that loading dirs for each disk. Once
the task is
+ // found finished (namely a dir has been loaded successfully), it would be
popped from
+ // the queue.
+ std::vector<std::queue<replica_dir_loader>> load_disk_queues(disks.size());
+
+ // The number of loading replica dirs that have been finished for each
disk, used to show
+ // current progress.
+ //
+ // TODO(wangdan): calculate the number of successful or failed loading of
replica dirs,
+ // and the number for each reason if failed.
+ std::vector<std::atomic<size_t>> finished_replica_dirs(disks.size());
+ for (auto &count : finished_replica_dirs) {
+ count.store(0);
+ }
+
+ // The lock for operations on the loaded replicas as output.
+ utils::ex_lock reps_lock;
+
+ while (true) {
+ size_t finished_disks = 0;
+
+ // For each round, start loading one replica for each disk in case
there are too many
+ // replicas in a disk, except that all of the replicas of this disk
are being loaded.
+ for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) {
+ // TODO(wangdan): Structured bindings can be captured by closures
in g++, while
+ // not supported well by clang. Thus we do not use following
statement to bind
+ // both variables until clang has been upgraded to version 16
which could support
+ // that well:
+ //
+ // const auto &[disk_node, replica_dirs] = disks[disk_index];
+ //
+ // For the docs of clang 16 please see:
+ //
+ //
https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support.
+ const auto &replica_dirs = disks[disk_index].replica_dirs;
+
+ auto &replica_dir_index = replica_dir_indexes[disk_index];
+ if (replica_dir_index >= replica_dirs.size()) {
+ // All of the replicas for the disk `disks[disk_index]` have
begun to be loaded,
+ // thus just skip to next disk.
+ ++finished_disks;
+ continue;
+ }
+
+ const auto &disk_node = disks[disk_index].disk_node;
+ auto &load_disk_queue = load_disk_queues[disk_index];
+ if (load_disk_queue.size() >=
FLAGS_max_replicas_on_load_for_each_disk) {
+ // Loading replicas should be throttled in case that disk IO
is saturated.
+ if (!load_disk_queue.front().load_replica_task->wait(
+
static_cast<int>(FLAGS_load_replica_max_wait_time_ms))) {
+ // There might be too many replicas that are being loaded
which lead to
+ // slow disk IO, thus turn to load replicas of next disk,
and try to load
+ // dir `replica_dir_index` of this disk in the next round.
+ LOG_WARNING("after {} ms, loading dir({}, {}/{}) is still
not finished, "
+ "there are {} replicas being loaded for
disk({}:{}, {}/{}), "
+ "now turn to next disk, and will begin to load
dir({}, {}/{}) "
+ "soon",
+ FLAGS_load_replica_max_wait_time_ms,
+ load_disk_queue.front().replica_dir_path,
+ load_disk_queue.front().replica_dir_index,
+ replica_dirs.size(),
+ load_disk_queue.size(),
+ disk_node->tag,
+ disk_node->full_dir,
+ disk_index,
+ disks.size(),
+ replica_dirs[replica_dir_index],
+ replica_dir_index,
+ replica_dirs.size());
+ continue;
+ }
+
+ // Now the queue size is within the limit again, continue to
load a new replica dir.
+ load_disk_queue.pop();
+ }
+
+ if
(dsn::replication::is_data_dir_invalid(replica_dirs[replica_dir_index])) {
+ LOG_WARNING("ignore dir({}, {}/{}) for disk({}:{}, {}/{})",
+ replica_dirs[replica_dir_index],
+ replica_dir_index,
+ replica_dirs.size(),
+ disk_node->tag,
+ disk_node->full_dir,
+ disk_index,
+ disks.size());
+ ++replica_dir_index;
+ continue;
+ }
+
+ LOG_DEBUG("ready to load dir({}, {}/{}) for disk({}:{}, {}/{})",
+ replica_dirs[replica_dir_index],
+ replica_dir_index,
+ replica_dirs.size(),
+ disk_node->tag,
+ disk_node->full_dir,
+ disk_index,
+ disks.size());
+
+ load_disk_queue.push(replica_dir_loader{
+ replica_dir_index,
+ replica_dirs[replica_dir_index],
+ tasking::create_task(
+ // Ensure that the thread pool is non-partitioned.
+ LPC_REPLICATION_INIT_LOAD,
+ &_tracker,
+ std::bind(static_cast<void (replica_stub::*)(dir_node *,
+ const
std::string &,
+ size_t,
+
utils::ex_lock &,
+
replica_map_by_gpid &,
+
std::atomic<size_t> &)>(
+ &replica_stub::load_replica),
+ this,
+ disk_node,
+ replica_dirs[replica_dir_index],
+ replica_dirs.size(),
+ std::ref(reps_lock),
+ std::ref(reps),
+ std::ref(finished_replica_dirs[disk_index])))});
+
+ load_disk_queue.back().load_replica_task->enqueue();
+
+ ++replica_dir_index;
+ }
+
+ if (finished_disks >= disks.size()) {
+ // All replicas of all disks have begun to be loaded.
+ break;
+ }
+ }
+
+ // All loading tasks have been in the queue. Just wait all tasks to be
finished.
+ for (auto &load_disk_queue : load_disk_queues) {
+ while (!load_disk_queue.empty()) {
+ CHECK_TRUE(load_disk_queue.front().load_replica_task->wait());
+ load_disk_queue.pop();
+ }
+ }
+}
+
void replica_stub::initialize(const replication_options &opts, bool clear /* =
false*/)
{
_primary_host_port = dsn_primary_host_port();
@@ -523,75 +836,14 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
// Start to load replicas in available data directories.
LOG_INFO("start to load replicas");
- std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
- for (const auto &dn : _fs_manager.get_dir_nodes()) {
- // Skip IO error dir_node.
- if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
- continue;
- }
- std::vector<std::string> sub_directories;
- CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir,
sub_directories, false),
- "fail to get sub_directories in {}",
- dn->full_dir);
- dirs_by_dn.emplace(dn.get(), sub_directories);
- }
-
- replicas rps;
- utils::ex_lock rps_lock;
- std::deque<task_ptr> load_tasks;
- uint64_t start_time = dsn_now_ms();
- for (const auto &dn_dirs : dirs_by_dn) {
- const auto dn = dn_dirs.first;
- for (const auto &dir : dn_dirs.second) {
- if (dsn::replication::is_data_dir_invalid(dir)) {
- LOG_WARNING("ignore dir {}", dir);
- continue;
- }
- load_tasks.push_back(tasking::create_task(
- LPC_REPLICATION_INIT_LOAD,
- &_tracker,
- [this, dn, dir, &rps, &rps_lock] {
- LOG_INFO("process dir {}", dir);
-
- auto r = load_replica(dn, dir.c_str());
- if (r == nullptr) {
- return;
- }
- LOG_INFO("{}@{}: load replica '{}' success, <durable, "
- "commit> = <{}, {}>, last_prepared_decree = {}",
- r->get_gpid(),
- dsn_primary_host_port(),
- dir,
- r->last_durable_decree(),
- r->last_committed_decree(),
- r->last_prepared_decree());
-
- utils::auto_lock<utils::ex_lock> l(rps_lock);
- CHECK(rps.find(r->get_gpid()) == rps.end(),
- "conflict replica dir: {} <--> {}",
- r->dir(),
- rps[r->get_gpid()]->dir());
-
- rps[r->get_gpid()] = r;
- },
- load_tasks.size()));
- load_tasks.back()->enqueue();
- }
- }
- for (auto &tsk : load_tasks) {
- tsk->wait();
- }
- uint64_t finish_time = dsn_now_ms();
+ replica_map_by_gpid reps;
+ load_replicas(reps);
- dirs_by_dn.clear();
- load_tasks.clear();
- LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms",
- rps.size(),
- finish_time - start_time);
+ LOG_INFO("load replicas succeed, replica_count = {}", reps.size());
bool is_log_complete = true;
- for (auto it = rps.begin(); it != rps.end(); ++it) {
+ for (auto it = reps.begin(); it != reps.end(); ++it) {
CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync
checkpoint failed");
it->second->reset_prepare_list_after_replay();
@@ -621,8 +873,8 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
if (!is_log_complete) {
LOG_ERROR("logs are not complete for some replicas, which means that
shared log is "
"truncated, mark all replicas as inactive");
- for (auto it = rps.begin(); it != rps.end(); ++it) {
- it->second->set_inactive_state_transient(false);
+ for (auto &[_, rep] : reps) {
+ rep->set_inactive_state_transient(false);
}
}
@@ -648,11 +900,11 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
std::chrono::seconds(FLAGS_disk_stat_interval_seconds));
}
- // attach rps
- _replicas = std::move(rps);
+ // Attach `reps`.
+ _replicas = std::move(reps);
METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size());
- for (const auto &kv : _replicas) {
- _fs_manager.add_replica(kv.first, kv.second->dir());
+ for (const auto &[pid, rep] : _replicas) {
+ _fs_manager.add_replica(pid, rep->dir());
}
_nfs = dsn::nfs_node::create();
@@ -746,7 +998,7 @@ dsn::error_code replica_stub::on_kill_replica(gpid id)
{
LOG_INFO("kill replica: gpid = {}", id);
if (id.get_app_id() == -1 || id.get_partition_index() == -1) {
- replicas rs;
+ replica_map_by_gpid rs;
{
zauto_read_lock l(_replicas_lock);
rs = _replicas;
@@ -1373,27 +1625,28 @@ void replica_stub::on_node_query_reply(error_code err,
resp.partitions.size(),
resp.gc_replicas.size());
- replicas rs;
+ replica_map_by_gpid reps;
{
zauto_read_lock rl(_replicas_lock);
- rs = _replicas;
+ reps = _replicas;
}
- for (auto it = resp.partitions.begin(); it != resp.partitions.end();
++it) {
- rs.erase(it->config.pid);
- tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER,
- &_tracker,
-
std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it),
- it->config.pid.thread_hash());
+ for (const auto &config_update : resp.partitions) {
+ reps.erase(config_update.config.pid);
+ tasking::enqueue(
+ LPC_QUERY_NODE_CONFIGURATION_SCATTER,
+ &_tracker,
+ std::bind(&replica_stub::on_node_query_reply_scatter, this,
this, config_update),
+ config_update.config.pid.thread_hash());
}
- // for rps not exist on meta_servers
- for (auto it = rs.begin(); it != rs.end(); ++it) {
+ // For the replicas that do not exist on meta_servers.
+ for (const auto &[pid, _] : reps) {
tasking::enqueue(
LPC_QUERY_NODE_CONFIGURATION_SCATTER2,
&_tracker,
- std::bind(&replica_stub::on_node_query_reply_scatter2, this,
this, it->first),
- it->first.thread_hash());
+ std::bind(&replica_stub::on_node_query_reply_scatter2, this,
this, pid),
+ pid.thread_hash());
}
// handle the replicas which need to be gc
@@ -1522,18 +1775,18 @@ void replica_stub::on_meta_server_disconnected()
_state = NS_Disconnected;
- replicas rs;
+ replica_map_by_gpid reps;
{
zauto_read_lock rl(_replicas_lock);
- rs = _replicas;
+ reps = _replicas;
}
- for (auto it = rs.begin(); it != rs.end(); ++it) {
+ for (const auto &[pid, _] : reps) {
tasking::enqueue(
LPC_CM_DISCONNECTED_SCATTER,
&_tracker,
- std::bind(&replica_stub::on_meta_server_disconnected_scatter,
this, this, it->first),
- it->first.thread_hash());
+ std::bind(&replica_stub::on_meta_server_disconnected_scatter,
this, this, pid),
+ pid.thread_hash());
}
}
@@ -1823,7 +2076,7 @@ void replica_stub::open_replica(
_primary_host_port_cache,
group_check ? "with" : "without",
dir);
- rep = load_replica(dn, dir.c_str());
+ rep = load_replica(dn, dir);
// if load data failed, re-open the `*.ori` folder which is the origin
replica dir of disk
// migration
@@ -1848,7 +2101,7 @@ void replica_stub::open_replica(
boost::replace_first(
origin_dir,
replica_disk_migrator::kReplicaDirOriginSuffix, "");
dsn::utils::filesystem::rename_path(origin_tmp_dir,
origin_dir);
- rep = load_replica(origin_dn, origin_dir.c_str());
+ rep = load_replica(origin_dn, origin_dir);
FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view)
-> void {});
}
@@ -1915,7 +2168,7 @@ void replica_stub::open_replica(
METRIC_VAR_DECREMENT(opening_replicas);
CHECK(_replicas.find(id) == _replicas.end(), "replica {} is already in
_replicas", id);
- _replicas.insert(replicas::value_type(rep->get_gpid(), rep));
+ _replicas.insert(replica_map_by_gpid::value_type(rep->get_gpid(),
rep));
METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(id);
@@ -1981,6 +2234,51 @@ replica *replica_stub::new_replica(gpid gpid,
return rep;
}
+replica *replica_stub::new_replica(gpid gpid,
+ const app_info &app,
+ bool restore_if_necessary,
+ bool is_duplication_follower)
+{
+ return new_replica(gpid, app, restore_if_necessary,
is_duplication_follower, {});
+}
+
+/*static*/ std::string replica_stub::get_replica_dir_name(const std::string
&dir)
+{
+ static const char splitters[] = {'\\', '/', 0};
+ return utils::get_last_component(dir, splitters);
+}
+
+/* static */ bool
+replica_stub::parse_replica_dir_name(const std::string &dir_name, gpid &pid,
std::string &app_type)
+{
+ std::vector<uint32_t> ids(2, 0);
+ size_t begin = 0;
+ for (auto &id : ids) {
+ size_t end = dir_name.find('.', begin);
+ if (end == std::string::npos) {
+ return false;
+ }
+
+ if (!buf2uint32(std::string_view(dir_name.data() + begin, end -
begin), id)) {
+ return false;
+ }
+
+ begin = end + 1;
+ }
+
+ if (begin >= dir_name.size()) {
+ return false;
+ }
+
+ pid.set_app_id(static_cast<int32_t>(ids[0]));
+ pid.set_partition_index(static_cast<int32_t>(ids[1]));
+
+ // TODO(wangdan): the 3rd parameter `count` does not support default
argument for CentOS 7
+ // (gcc 7.3.1). After CentOS 7 is deprecated, consider dropping
std::string::npos.
+ app_type.assign(dir_name, begin, std::string::npos);
+ return true;
+}
+
bool replica_stub::validate_replica_dir(const std::string &dir,
app_info &ai,
gpid &pid,
@@ -1991,21 +2289,18 @@ bool replica_stub::validate_replica_dir(const
std::string &dir,
return false;
}
- char splitters[] = {'\\', '/', 0};
- const auto name = utils::get_last_component(dir, splitters);
- if (name.empty()) {
+ const auto &dir_name = get_replica_dir_name(dir);
+ if (dir_name.empty()) {
hint_message = fmt::format("invalid replica dir '{}'", dir);
return false;
}
- char app_type[128] = {0};
- int32_t app_id, pidx;
- if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
+ std::string app_type;
+ if (!parse_replica_dir_name(dir_name, pid, app_type)) {
hint_message = fmt::format("invalid replica dir '{}'", dir);
return false;
}
- pid = gpid(app_id, pidx);
replica_app_info rai(&ai);
const auto ai_path = utils::filesystem::path_combine(dir,
replica_app_info::kAppInfo);
const auto err = rai.load(ai_path);
@@ -2019,10 +2314,9 @@ bool replica_stub::validate_replica_dir(const
std::string &dir,
return false;
}
- // When the online partition split function aborted, the garbage
partitions are with pidx in
- // the range of [ai.partition_count, 2 * ai.partition_count), which means
the partitions with
- // pidx >= ai.partition_count are garbage partitions.
- if (ai.partition_count <= pidx) {
+ if (pid.get_partition_index() >= ai.partition_count) {
+ // Once the online partition split aborted, the partitions within the
range of
+ // [ai.partition_count, 2 * ai.partition_count) would become garbage.
hint_message = fmt::format(
"partition[{}], count={}, this replica may be partition split
garbage partition, "
"ignore it",
@@ -2034,7 +2328,7 @@ bool replica_stub::validate_replica_dir(const std::string
&dir,
return true;
}
-replica *replica_stub::load_replica(dir_node *dn, const char *dir)
+replica_ptr replica_stub::load_replica(dir_node *disk_node, const std::string
&replica_dir)
{
FAIL_POINT_INJECT_F("mock_replica_load",
[&](std::string_view) -> replica * { return nullptr;
});
@@ -2042,24 +2336,28 @@ replica *replica_stub::load_replica(dir_node *dn, const
char *dir)
app_info ai;
gpid pid;
std::string hint_message;
- if (!validate_replica_dir(dir, ai, pid, hint_message)) {
- LOG_ERROR("invalid replica dir '{}', hint: {}", dir, hint_message);
+ if (!validate_replica_dir(replica_dir, ai, pid, hint_message)) {
+ LOG_ERROR("invalid replica dir '{}', hint={}", replica_dir,
hint_message);
return nullptr;
}
// The replica's directory must exist when creating a replica.
- CHECK_EQ(dir, dn->replica_dir(ai.app_type, pid));
- auto *rep = new replica(this, pid, ai, dn, false);
+ CHECK_EQ(disk_node->replica_dir(ai.app_type, pid), replica_dir);
+
+ auto *rep = new replica(this, pid, ai, disk_node, false);
const auto err = rep->initialize_on_load();
if (err != ERR_OK) {
- LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
- rep->close();
+ LOG_ERROR("{}: load replica failed, tag={}, replica_dir={}, err={}",
+ rep->name(),
+ disk_node->tag,
+ replica_dir,
+ err);
delete rep;
rep = nullptr;
// clear work on failure
- if (dsn::utils::filesystem::directory_exists(dir)) {
- move_to_err_path(dir, "load replica");
+ if (dsn::utils::filesystem::directory_exists(replica_dir)) {
+ move_to_err_path(replica_dir, "load replica");
METRIC_VAR_INCREMENT(moved_error_replicas);
_fs_manager.remove_replica(pid);
}
@@ -2067,7 +2365,10 @@ replica *replica_stub::load_replica(dir_node *dn, const
char *dir)
return nullptr;
}
- LOG_INFO("{}: load replica succeed", rep->name());
+ LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}",
+ rep->name(),
+ disk_node->tag,
+ replica_dir);
return rep;
}
@@ -2394,11 +2695,6 @@ void replica_stub::register_ctrl_command()
});
}));
-
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
- FLAGS_empty_write_disabled,
- "replica.disable-empty-write",
- "whether to disable empty writes"));
-
#ifdef DSN_ENABLE_GPERF
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
_release_tcmalloc_memory,
@@ -2433,61 +2729,64 @@ void replica_stub::register_ctrl_command()
#elif defined(DSN_USE_JEMALLOC)
register_jemalloc_ctrl_command();
#endif
-
_cmds.emplace_back(::dsn::command_manager::instance().register_int_command(
- FLAGS_max_concurrent_bulk_load_downloading_count,
- FLAGS_max_concurrent_bulk_load_downloading_count,
- "replica.max-concurrent-bulk-load-downloading-count",
- kMaxConcurrentBulkLoadDownloadingCountDesc));
});
}
std::string
-replica_stub::exec_command_on_replica(const std::vector<std::string> &args,
+replica_stub::exec_command_on_replica(const std::vector<std::string>
&arg_str_list,
bool allow_empty_args,
- std::function<std::string(const
replica_ptr &rep)> func)
+ std::function<std::string(const
replica_ptr &)> func)
{
- if (args.empty() && !allow_empty_args) {
- return std::string("invalid arguments");
+ static const std::string kInvalidArguments("invalid arguments");
+
+ if (arg_str_list.empty() && !allow_empty_args) {
+ return kInvalidArguments;
}
- replicas rs;
+ replica_map_by_gpid rs;
{
zauto_read_lock l(_replicas_lock);
rs = _replicas;
}
std::set<gpid> required_ids;
- replicas choosed_rs;
- if (!args.empty()) {
- for (int i = 0; i < args.size(); i++) {
- std::vector<std::string> arg_strs;
- utils::split_args(args[i].c_str(), arg_strs, ',');
- if (arg_strs.empty()) {
- return std::string("invalid arguments");
+ replica_map_by_gpid choosed_rs;
+ if (!arg_str_list.empty()) {
+ for (const auto &arg_str : arg_str_list) {
+ std::vector<std::string> args;
+ utils::split_args(arg_str.c_str(), args, ',');
+ if (args.empty()) {
+ return kInvalidArguments;
}
- for (const std::string &arg : arg_strs) {
- if (arg.empty())
+ for (const std::string &arg : args) {
+ if (arg.empty()) {
continue;
+ }
+
gpid id;
- int pid;
if (id.parse_from(arg.c_str())) {
- // app_id.partition_index
+ // Format: app_id.partition_index
required_ids.insert(id);
auto find = rs.find(id);
if (find != rs.end()) {
choosed_rs[id] = find->second;
}
- } else if (sscanf(arg.c_str(), "%d", &pid) == 1) {
- // app_id
- for (auto kv : rs) {
- id = kv.second->get_gpid();
- if (id.get_app_id() == pid) {
- choosed_rs[id] = kv.second;
- }
+
+ continue;
+ }
+
+ // Must be app_id.
+ int32_t app_id = 0;
+ if (!buf2int32(arg, app_id)) {
+ return kInvalidArguments;
+ }
+
+ for (const auto &[_, rep] : rs) {
+ id = rep->get_gpid();
+ if (id.get_app_id() == app_id) {
+ choosed_rs[id] = rep;
}
- } else {
- return std::string("invalid arguments");
}
}
}
@@ -2507,8 +2806,10 @@ replica_stub::exec_command_on_replica(const
std::vector<std::string> &args,
[rep, &func, &results_lock, &results]() {
partition_status::type status = rep->status();
if (status != partition_status::PS_PRIMARY &&
- status != partition_status::PS_SECONDARY)
+ status != partition_status::PS_SECONDARY) {
return;
+ }
+
std::string result = func(rep);
::dsn::zauto_lock l(results_lock);
auto &value = results[rep->get_gpid()];
@@ -2720,33 +3021,39 @@ replica_ptr
replica_stub::create_child_replica_if_not_found(gpid child_pid,
CHECK_NOTNULL(dn, "");
auto *rep = new replica(this, child_pid, *app, dn, false);
rep->_config.status = partition_status::PS_INACTIVE;
- _replicas.insert(replicas::value_type(child_pid, rep));
+ _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep));
LOG_INFO("mock create_child_replica_if_not_found succeed");
return rep;
});
zauto_write_lock l(_replicas_lock);
- auto it = _replicas.find(child_pid);
+
+ const auto it = _replicas.find(child_pid);
if (it != _replicas.end()) {
return it->second;
- } else {
- if (_opening_replicas.find(child_pid) != _opening_replicas.end()) {
- LOG_WARNING("failed create child replica({}) because it is under
open", child_pid);
- return nullptr;
- } else if (_closing_replicas.find(child_pid) !=
_closing_replicas.end()) {
- LOG_WARNING("failed create child replica({}) because it is under
close", child_pid);
- return nullptr;
- } else {
- replica *rep = new_replica(child_pid, *app, false, false,
parent_dir);
- if (rep != nullptr) {
- auto pr = _replicas.insert(replicas::value_type(child_pid,
rep));
- CHECK(pr.second, "child replica {} has been existed",
rep->name());
- METRIC_VAR_INCREMENT(total_replicas);
- _closed_replicas.erase(child_pid);
- }
- return rep;
- }
}
+
+ if (_opening_replicas.find(child_pid) != _opening_replicas.end()) {
+ LOG_WARNING("failed create child replica({}) because it is under
open", child_pid);
+ return nullptr;
+ }
+
+ if (_closing_replicas.find(child_pid) != _closing_replicas.end()) {
+ LOG_WARNING("failed create child replica({}) because it is under
close", child_pid);
+ return nullptr;
+ }
+
+ replica *rep = new_replica(child_pid, *app, false, false, parent_dir);
+ if (rep == nullptr) {
+ return nullptr;
+ }
+
+ const auto pr =
_replicas.insert(replica_map_by_gpid::value_type(child_pid, rep));
+ CHECK(pr.second, "child replica {} has been existed", rep->name());
+ METRIC_VAR_INCREMENT(total_replicas);
+ _closed_replicas.erase(child_pid);
+
+ return rep;
}
// ThreadPool: THREAD_POOL_REPLICATION
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index d3b860076..7d03d06ee 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -27,8 +27,9 @@
#pragma once
#include <gtest/gtest_prod.h>
-#include <stdint.h>
#include <atomic>
+#include <cstddef>
+#include <cstdint>
#include <functional>
#include <map>
#include <memory>
@@ -70,6 +71,12 @@
#include "utils/metrics.h"
#include "utils/zlocks.h"
+namespace dsn::utils {
+
+class ex_lock;
+
+} // namespace dsn::utils
+
DSN_DECLARE_uint32(max_concurrent_manual_emergency_checkpointing_count);
namespace dsn {
@@ -112,14 +119,14 @@ typedef rpc_holder<add_new_disk_request,
add_new_disk_response> add_new_disk_rpc
namespace test {
class test_checker;
-}
+} // namespace test
+
class cold_backup_context;
class replica_split_manager;
-typedef std::function<void(const ::dsn::host_port & /*from*/,
- const replica_configuration & /*new_config*/,
- bool /*is_closing*/)>
- replica_state_subscriber;
-typedef std::unordered_map<gpid, replica_ptr> replicas;
+
+using replica_state_subscriber = std::function<void(const ::dsn::host_port &
/*from*/,
+ const
replica_configuration & /*new_config*/,
+ bool /*is_closing*/)>;
class replica_stub;
@@ -222,9 +229,9 @@ public:
// - if allow_empty_args = false, you should specify at least one
argument.
// each argument should be in format of:
// id1,id2... (where id is 'app_id' or 'app_id.partition_id')
- std::string exec_command_on_replica(const std::vector<std::string> &args,
+ std::string exec_command_on_replica(const std::vector<std::string>
&arg_str_list,
bool allow_empty_args,
- std::function<std::string(const
replica_ptr &rep)> func);
+ std::function<std::string(const
replica_ptr &)> func);
//
// partition split
@@ -351,15 +358,61 @@ private:
gpid id,
const std::shared_ptr<group_check_request> &req,
const std::shared_ptr<configuration_update_request>
&req2);
- // Create a new replica according to the parameters.
- // 'parent_dir' is used in partition split for create_child_replica_dir().
+
+ // Create a child replica for partition split, with 'parent_dir' specified
as the parent
+ // replica dir used for `create_child_replica_dir()`.
replica *new_replica(gpid gpid,
const app_info &app,
bool restore_if_necessary,
bool is_duplication_follower,
- const std::string &parent_dir = "");
- // Load an existing replica which is located in 'dn' with 'dir' directory.
- replica *load_replica(dir_node *dn, const char *dir);
+ const std::string &parent_dir);
+
+ // Create a new replica, choosing and assigning the best dir for it.
+ replica *new_replica(gpid gpid,
+ const app_info &app,
+ bool restore_if_necessary,
+ bool is_duplication_follower);
+
+ // Each disk with its candidate replica dirs, used to load replicas while
initializing.
+ struct disk_replicas_info
+ {
+ // `dir_node` for each disk.
+ dir_node *disk_node;
+
+ // All replica dirs on each disk.
+ std::vector<std::string> replica_dirs;
+ };
+
+ // Get the absolute dirs of all replicas for all healthy disks without IO
errors.
+ std::vector<disk_replicas_info> get_all_disk_dirs() const;
+
+ // Get the replica dir name from a potentially longer path (`dir` could be
an absolute
+ // or relative path).
+ static std::string get_replica_dir_name(const std::string &dir);
+
+ // Parse app id, partition id and app type from the replica dir name.
+ static bool
+ parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string
&app_type);
+
+ // Load an existing replica which is located in `dn` with `replica_dir`.
Usually each
+ // different `dn` represents a unique disk. `replica_dir` is the absolute
path of the
+ // directory for a replica.
+ virtual replica_ptr load_replica(dir_node *disk_node, const std::string
&replica_dir);
+
+ using replica_map_by_gpid = std::unordered_map<gpid, replica_ptr>;
+
+ // The same as the above `load_replica` function, except that this
function is to load
+ // each replica to `reps` with protection from `reps_lock`.
+ void load_replica(dir_node *disk_node,
+ const std::string &replica_dir,
+ size_t total_dir_count,
+ utils::ex_lock &reps_lock,
+ replica_map_by_gpid &reps,
+ std::atomic<size_t> &finished_dir_count);
+
+ // Load all replicas simultaneously from all disks to `reps`.
+ void load_replicas(replica_map_by_gpid &reps);
+
// Clean up the memory state and on disk data if creating replica failed.
void clear_on_failure(replica *rep);
task_ptr begin_close_replica(replica_ptr r);
@@ -445,21 +498,26 @@ private:
friend class replica_follower;
friend class replica_follower_test;
friend class replica_http_service_test;
+ friend class mock_load_replica;
+ friend class GetReplicaDirNameTest;
+ friend class ParseReplicaDirNameTest;
FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
FRIEND_TEST(replica_test, test_auto_trash_of_corruption);
FRIEND_TEST(replica_test, test_clear_on_failure);
- typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
- typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr,
app_info, replica_info>>
- closing_replicas; // <gpid, <close_task, replica, app_info,
replica_info> >
- typedef std::map<gpid, std::pair<app_info, replica_info>>
- closed_replicas; // <gpid, <app_info, replica_info> >
+ using opening_replica_map_by_gpid = std::unordered_map<gpid, task_ptr>;
+
+ // `task_ptr` is the task closing a replica.
+ using closing_replica_map_by_gpid =
+ std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info,
replica_info>>;
+
+ using closed_replica_map_by_gpid = std::map<gpid, std::pair<app_info,
replica_info>>;
mutable zrwlock_nr _replicas_lock;
- replicas _replicas;
- opening_replicas _opening_replicas;
- closing_replicas _closing_replicas;
- closed_replicas _closed_replicas;
+ replica_map_by_gpid _replicas;
+ opening_replica_map_by_gpid _opening_replicas;
+ closing_replica_map_by_gpid _closing_replicas;
+ closed_replica_map_by_gpid _closed_replicas;
::dsn::host_port _primary_host_port;
// The stringify of '_primary_host_port', used by logging usually.
@@ -565,5 +623,6 @@ private:
dsn::task_tracker _tracker;
};
+
} // namespace replication
} // namespace dsn
diff --git a/src/replica/test/config-test.ini b/src/replica/test/config-test.ini
index 2d2c6f397..b2e0e86c2 100644
--- a/src/replica/test/config-test.ini
+++ b/src/replica/test/config-test.ini
@@ -34,7 +34,7 @@ type = replica
run = true
count = 1
ports = 54321
-pools =
THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE
+pools =
THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_LOCAL_APP,THREAD_POOL_REPLICATION,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE
[core]
;tool = simulator
@@ -76,6 +76,12 @@ partitioned = true
worker_priority = THREAD_xPRIORITY_NORMAL
worker_count = 3
+[threadpool.THREAD_POOL_LOCAL_APP]
+name = local_app
+partitioned = false
+worker_priority = THREAD_xPRIORITY_NORMAL
+worker_count = 4
+
[threadpool.THREAD_POOL_REPLICATION_LONG]
name = replica_long
diff --git a/src/replica/test/load_replicas_test.cpp
b/src/replica/test/load_replicas_test.cpp
new file mode 100644
index 000000000..3315629c9
--- /dev/null
+++ b/src/replica/test/load_replicas_test.cpp
@@ -0,0 +1,339 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstddef>
+#include <cstdint>
+#include <fmt/core.h>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <set>
+#include <string>
+#include <type_traits>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "common/fs_manager.h"
+#include "common/gpid.h"
+#include "common/replication.codes.h"
+#include "dsn.layer2_types.h"
+#include "gtest/gtest.h"
+#include "replica/replica.h"
+#include "replica/replica_stub.h"
+#include "replica/replication_app_base.h"
+#include "replica/test/mock_utils.h"
+#include "task/task.h"
+#include "task/task_code.h"
+#include "task/task_spec.h"
+#include "test_util/test_util.h"
+#include "utils/autoref_ptr.h"
+#include "utils/filesystem.h"
+#include "utils/flags.h"
+#include "utils/ports.h"
+
+DSN_DECLARE_uint64(max_replicas_on_load_for_each_disk);
+
+namespace dsn::replication {
+
+class mock_load_replica : public replica_stub
+{
+public:
+ mock_load_replica() = default;
+
+ ~mock_load_replica() override = default;
+
+ void initialize(const std::map<std::string, std::string> &dirs_by_tag,
+ const std::map<std::string, std::vector<gpid>>
&replicas_by_tag)
+ {
+ // Get dirs and tags to initialize fs_manager.
+ std::vector<std::string> dirs;
+ std::vector<std::string> tags;
+ for (const auto &[tag, dir] : dirs_by_tag) {
+ dirs.push_back(dir);
+ tags.push_back(tag);
+ }
+
+ // Initialize fs_manager by the tag and dir of each disk.
+ _fs_manager.initialize(dirs, tags);
+
+ // Generate the replicas which are expected after loading.
+ for (const auto &[tag, reps] : replicas_by_tag) {
+ for (const auto &pid : reps) {
+ ASSERT_TRUE(_expected_loaded_replica_pids.insert(pid).second);
+ }
+ }
+
+ _disk_tags_for_order.clear();
+ _disk_dirs_for_order.clear();
+ _disk_replicas_for_order.clear();
+ _disk_loaded_replicas_for_order.assign(replicas_by_tag.size(), 0);
+
+ // Ensure that the disks are scanned in the order returned by
`get_dir_node()`.
+ for (const auto &dn : _fs_manager.get_dir_nodes()) {
+ for (const auto &pid : replicas_by_tag.at(dn->tag)) {
+ _fs_manager.specify_dir_for_new_replica_for_test(dn.get(),
"pegasus", pid);
+ }
+
+ _disk_tags_for_order.push_back(dn->tag);
+ _disk_dirs_for_order.push_back(dn->full_dir);
+
_disk_replicas_for_order.push_back(replicas_by_tag.at(dn->tag).size());
+ }
+
+ ASSERT_EQ(_disk_tags_for_order.size(), _disk_dirs_for_order.size());
+ }
+
+ void test_load_replicas(bool test_load_order, uint64_t
max_replicas_on_load_for_each_disk)
+ {
+ PRESERVE_VAR(allow_inline,
dsn::task_spec::get(LPC_REPLICATION_INIT_LOAD)->allow_inline);
+ dsn::task_spec::get(LPC_REPLICATION_INIT_LOAD)->allow_inline =
test_load_order;
+
+ PRESERVE_FLAG(max_replicas_on_load_for_each_disk);
+ FLAGS_max_replicas_on_load_for_each_disk =
max_replicas_on_load_for_each_disk;
+
+ // Check if all loaded replicas are matched.
+ replica_stub::replica_map_by_gpid actual_loaded_replicas;
+ load_replicas(actual_loaded_replicas);
+ ASSERT_EQ(_expected_loaded_replicas, actual_loaded_replicas);
+
+ // Check if all replicas have been loaded.
+ std::set<gpid> actual_loaded_replica_pids;
+ for (const auto &[pid, _] : actual_loaded_replicas) {
+ ASSERT_TRUE(actual_loaded_replica_pids.insert(pid).second);
+ }
+ ASSERT_EQ(_expected_loaded_replica_pids, actual_loaded_replica_pids);
+ }
+
+ void remove_disk_dirs()
+ {
+ for (const auto &dn : _fs_manager.get_dir_nodes()) {
+ ASSERT_TRUE(utils::filesystem::remove_path(dn->full_dir));
+ }
+ }
+
+private:
+ void load_replica_for_test(dir_node *dn, const std::string &replica_dir,
replica_ptr &rep)
+ {
+ ASSERT_TRUE(utils::filesystem::directory_exists(replica_dir));
+
+ const auto &dir_name = get_replica_dir_name(replica_dir);
+
+ gpid pid;
+ std::string app_type;
+ ASSERT_TRUE(parse_replica_dir_name(dir_name, pid, app_type));
+ ASSERT_STREQ("pegasus", app_type.c_str());
+
+ ASSERT_EQ(LPC_REPLICATION_INIT_LOAD,
task::get_current_task()->spec().code);
+
+ if (task::get_current_task()->spec().allow_inline) {
+ // Once the task is `allow_inline`, it would be executed in place
immediately rather
+ // than pushed into the queue. Thus we could test the expected
order in which the
+ // tasks are pushed into the queue.
+
+ // Find the first disk where there is still some replica that has
not been loaded.
+ size_t finished_disks = 0;
+ while (_disk_loaded_replicas_for_order[_disk_index_for_order] >=
+ _disk_replicas_for_order[_disk_index_for_order]) {
+ // Since current task has not been executed, it is not
possible that all disks
+ // are finished.
+ ++finished_disks;
+ ASSERT_GT(_disk_tags_for_order.size(), finished_disks);
+
+ // Skip to next disk since all of the replicas of this disk
have been loaded.
+ _disk_index_for_order = (_disk_index_for_order + 1) %
_disk_tags_for_order.size();
+ }
+
+ // Only check if the processed order of the disk the replica
belongs to, rather than
+ // the order of the replica itself, for the reason that the order
of the dirs returned
+ // by the underlying call might vary.
+ ASSERT_EQ(_disk_tags_for_order[_disk_index_for_order], dn->tag);
+ ASSERT_EQ(_disk_dirs_for_order[_disk_index_for_order],
dn->full_dir);
+
+ // Current replica has been loaded, move forward to the next
replica of this disk.
+ ++_disk_loaded_replicas_for_order[_disk_index_for_order];
+
+ // Turn to next disks if some of them still have some replicas
that are not loaded.
+ _disk_index_for_order = (_disk_index_for_order + 1) %
_disk_tags_for_order.size();
+ }
+
+ // Check the absolute dir of this replica.
+ ASSERT_EQ(dn->replica_dir("pegasus", pid), replica_dir);
+
+ app_info ai;
+ ai.app_type = "pegasus";
+ rep = new replica(this, pid, ai, dn, false);
+ rep->_app =
std::make_unique<replication::mock_replication_app_base>(rep);
+
+ std::lock_guard<std::mutex> guard(_mtx);
+
+ ASSERT_TRUE(_expected_loaded_replicas.find(pid) ==
_expected_loaded_replicas.end());
+
+ _expected_loaded_replicas[pid] = rep;
+ }
+
+ // Mock the process of loading a replica.
+ replica_ptr load_replica(dir_node *dn, const std::string &replica_dir)
override
+ {
+ replica_ptr rep;
+ load_replica_for_test(dn, replica_dir, rep);
+ return rep;
+ }
+
+ std::set<gpid> _expected_loaded_replica_pids;
+
+ // The variables with postfix `_for_order` are only for testing the order
of the loading
+ // tasks.
+ size_t _disk_index_for_order{0};
+ std::vector<std::string> _disk_tags_for_order;
+ std::vector<std::string> _disk_dirs_for_order;
+ std::vector<size_t> _disk_replicas_for_order;
+ std::vector<size_t> _disk_loaded_replicas_for_order;
+
+ mutable std::mutex _mtx;
+ replica_stub::replica_map_by_gpid _expected_loaded_replicas;
+
+ DISALLOW_COPY_AND_ASSIGN(mock_load_replica);
+ DISALLOW_MOVE_AND_ASSIGN(mock_load_replica);
+};
+
+struct load_replicas_case
+{
+ // Each disk tag => dir of this disk.
+ std::map<std::string, std::string> dirs_by_tag;
+
+ // Each disk tag => replicas (specified by <app_id, partition_id>) on this
disk.
+ std::map<std::string, std::vector<gpid>> replicas_by_tag;
+};
+
+class LoadReplicasTest : public testing::TestWithParam<load_replicas_case>
+{
+public:
+ LoadReplicasTest()
+ {
+ // Remove all dirs of all disks to prevent each test from being
disturbed.
+ _stub.remove_disk_dirs();
+
+ // Use test cases to initialize the replica stub.
+ const auto &load_case = GetParam();
+ _stub.initialize(load_case.dirs_by_tag, load_case.replicas_by_tag);
+ }
+
+ ~LoadReplicasTest() override { _stub.remove_disk_dirs(); }
+
+ void test_load_replicas(bool test_load_order, uint64_t
max_replicas_on_load_for_each_disk)
+ {
+ _stub.test_load_replicas(test_load_order,
max_replicas_on_load_for_each_disk);
+ }
+
+private:
+ mock_load_replica _stub;
+
+ DISALLOW_COPY_AND_ASSIGN(LoadReplicasTest);
+ DISALLOW_MOVE_AND_ASSIGN(LoadReplicasTest);
+};
+
+TEST_P(LoadReplicasTest, LoadReplicas) { test_load_replicas(false, 256); }
+
+TEST_P(LoadReplicasTest, LoadOrder) { test_load_replicas(true, 256); }
+
+TEST_P(LoadReplicasTest, LoadThrottling) { test_load_replicas(false, 5); }
+
+// Generate a test case for loading replicas. Each element in `disk_replicas`
is corresponding
+// to the number of replicas on a disk.
+load_replicas_case generate_load_replicas_case(const std::vector<size_t>
&disk_replicas)
+{
+ std::map<std::string, std::string> dirs_by_tag;
+ for (size_t disk_index = 0; disk_index < disk_replicas.size();
++disk_index) {
+ dirs_by_tag.emplace(fmt::format("data{}", disk_index),
fmt::format("disk{}", disk_index));
+ }
+
+ static const uint32_t kNumBitsPartitions = 3;
+ static const uint32_t kNumPartitions = 1U << kNumBitsPartitions;
+
+ uint32_t app_id = 1;
+ uint32_t partition_id = 0;
+
+ std::map<std::string, std::vector<gpid>> replicas_by_tag;
+
+ while (true) {
+ size_t finished_disks = 0;
+
+ for (size_t disk_index = 0; disk_index < disk_replicas.size();
++disk_index) {
+ auto &replica_list = replicas_by_tag[fmt::format("data{}",
disk_index)];
+ if (replica_list.size() >= disk_replicas[disk_index]) {
+ // All replicas on this disk have been generated, just skip to
next disk.
+ ++finished_disks;
+ continue;
+ }
+
+ // Generate a replica with current app id and partition index.
+ replica_list.emplace_back(static_cast<int32_t>(app_id),
+ static_cast<int32_t>(partition_id));
+
+ // Once next partition index is found 0, increment app id to turn
to next table.
+ app_id += ((partition_id + 1) & kNumPartitions) >>
kNumBitsPartitions;
+
+ // Increment index to turn to next partition.
+ partition_id = (partition_id + 1) & (kNumPartitions - 1);
+ }
+
+ if (finished_disks >= disk_replicas.size()) {
+ // All disks have been done.
+ break;
+ }
+ }
+
+ return {dirs_by_tag, replicas_by_tag};
+}
+
+std::vector<load_replicas_case> generate_load_replicas_cases()
+{
+ // At least 1 disk should be included (otherwise it would lead to core
dump), thus do
+ // not generate the empty case (i.e. {}).
+ return std::vector<load_replicas_case>({
+ // There is only one disk which has none of replica.
+ generate_load_replicas_case({0}),
+ // There are two disks both of which have none of replica.
+ generate_load_replicas_case({0, 0}),
+ // There is only one disk which has one replica.
+ generate_load_replicas_case({1}),
+ // There are two disks one of which has one replica, and another has
none.
+ generate_load_replicas_case({1, 0}),
+ generate_load_replicas_case({0, 1}),
+ // There is only one disk which has two replicas.
+ generate_load_replicas_case({2}),
+ // There are two disks one of which has two replicas, and another has
none.
+ generate_load_replicas_case({2, 0}),
+ generate_load_replicas_case({0, 2}),
+ // There are at least three disks.
+ generate_load_replicas_case({1, 0, 2}),
+ generate_load_replicas_case({8, 25, 16}),
+ generate_load_replicas_case({17, 96, 56, 127}),
+ generate_load_replicas_case({22, 38, 0, 16}),
+ generate_load_replicas_case({82, 75, 36, 118, 65}),
+ generate_load_replicas_case({0, 92, 17, 68, 25}),
+ // There are many replicas for some disks.
+ generate_load_replicas_case({156, 367, 309, 58, 404, 298, 512, 82}),
+ generate_load_replicas_case({167, 28, 898, 516, 389, 422, 682, 265,
596}),
+ });
+}
+
+INSTANTIATE_TEST_SUITE_P(ReplicaStubTest,
+ LoadReplicasTest,
+ testing::ValuesIn(generate_load_replicas_cases()));
+
+} // namespace dsn::replication
diff --git a/src/replica/test/replica_dir_test.cpp
b/src/replica/test/replica_dir_test.cpp
new file mode 100644
index 000000000..3bc64d74f
--- /dev/null
+++ b/src/replica/test/replica_dir_test.cpp
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include "common/gpid.h"
+#include "gtest/gtest.h"
+#include "replica/replica_stub.h"
+
+namespace dsn::replication {
+
+struct get_replica_dir_name_case
+{
+ std::string path;
+ std::string expected_replica_dir_name;
+};
+
+class GetReplicaDirNameTest : public
testing::TestWithParam<get_replica_dir_name_case>
+{
+public:
+ static void test_get_replica_dir_name()
+ {
+ const auto &test_case = GetParam();
+ const auto &actual_replica_dir_name =
replica_stub::get_replica_dir_name(test_case.path);
+ EXPECT_EQ(test_case.expected_replica_dir_name,
actual_replica_dir_name);
+ }
+};
+
+TEST_P(GetReplicaDirNameTest, GetReplicaDirName) {
test_get_replica_dir_name(); }
+
+const std::vector<get_replica_dir_name_case> get_replica_dir_name_tests{
+ // Linux absolute path and non-empty dir name.
+ {"/data/pegasus/1.2.pegasus", "1.2.pegasus"},
+ // Linux absolute path and empty dir name.
+ {"/data/pegasus/1.2.pegasus/", ""},
+ // Windows absolute path and non-empty dir name.
+ {R"(D:\data\pegasus\1.2.pegasus)", "1.2.pegasus"},
+ // Windows absolute path and empty dir name.
+ {R"(D:\data\pegasus\1.2.pegasus\)", ""},
+ // Linux relative path and non-empty dir name.
+ {"./1.2.pegasus", "1.2.pegasus"},
+ // Linux relative path and empty dir name.
+ {"./1.2.pegasus/", ""},
+ // Windows relative path and non-empty dir name.
+ {R"(.\1.2.pegasus)", "1.2.pegasus"},
+ // Windows relative path and empty dir name.
+ {R"(.\1.2.pegasus\)", ""},
+};
+
+INSTANTIATE_TEST_SUITE_P(ReplicaDirTest,
+ GetReplicaDirNameTest,
+ testing::ValuesIn(get_replica_dir_name_tests));
+
+struct parse_replica_dir_name_case
+{
+ std::string replica_dir_name;
+ bool ok;
+ gpid expected_pid;
+ std::string expected_app_type;
+};
+
+class ParseReplicaDirNameTest : public
testing::TestWithParam<parse_replica_dir_name_case>
+{
+public:
+ static void test_parse_replica_dir_name()
+ {
+ const auto &test_case = GetParam();
+
+ gpid actual_pid;
+ std::string actual_app_type;
+ ASSERT_EQ(test_case.ok,
+ replica_stub::parse_replica_dir_name(
+ test_case.replica_dir_name, actual_pid,
actual_app_type));
+ if (!test_case.ok) {
+ return;
+ }
+
+ EXPECT_EQ(test_case.expected_pid, actual_pid);
+ EXPECT_EQ(test_case.expected_app_type, actual_app_type);
+ }
+};
+
+TEST_P(ParseReplicaDirNameTest, ParseReplicaDirName) {
test_parse_replica_dir_name(); }
+
+const std::vector<parse_replica_dir_name_case> parse_replica_dir_name_tests{
+ // Empty dir name.
+ {"", false, {}, ""},
+ // Single-digit IDs.
+ {"1.2.pegasus", true, {1, 2}, "pegasus"},
+ // Multi-digit IDs.
+ {"1234.56789.pegasus", true, {1234, 56789}, "pegasus"},
+ // Custom app type other than "pegasus".
+ {"1.2.another", true, {1, 2}, "another"},
+ // Custom app type with dot.
+ {"1.2.another.pegasus", true, {1, 2}, "another.pegasus"},
+ // Custom app type with other specific symbol.
+ {"1.2.another_pegasus", true, {1, 2}, "another_pegasus"},
+ // Missing one ID.
+ {"1.pegasus", false, {}, ""},
+ // Missing both IDs.
+ {"pegasus", false, {}, ""},
+ // ID with letter.
+ {"1.2a.pegasus", false, {}, ""},
+ // ID with minus.
+ {"1.-2.pegasus", false, {}, ""},
+};
+
+INSTANTIATE_TEST_SUITE_P(ReplicaDirTest,
+ ParseReplicaDirNameTest,
+ testing::ValuesIn(parse_replica_dir_name_tests));
+
+} // namespace dsn::replication
diff --git a/src/server/result_writer.cpp b/src/server/result_writer.cpp
index df78172e9..4d3a9160e 100644
--- a/src/server/result_writer.cpp
+++ b/src/server/result_writer.cpp
@@ -21,7 +21,6 @@
#include <pegasus/error.h>
#include <chrono>
-#include <type_traits>
#include <utility>
#include "pegasus/client.h"
diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h
index 2e2b34bb0..549d8fd94 100644
--- a/src/test_util/test_util.h
+++ b/src/test_util/test_util.h
@@ -44,11 +44,12 @@ class file_meta;
} // namespace replication
} // namespace dsn
+#define PRESERVE_VAR(name, expr)
\
+ const auto PRESERVED_##name = expr;
\
+ auto PRESERVED_##name##_cleanup = dsn::defer([PRESERVED_##name]() { expr =
PRESERVED_##name; })
+
// Save the current value of a flag and restore it at the end of the function.
-#define PRESERVE_FLAG(name)
\
- const auto PRESERVED_FLAGS_##name = FLAGS_##name;
\
- auto PRESERVED_FLAGS_##name##_cleanup =
\
- dsn::defer([PRESERVED_FLAGS_##name]() { FLAGS_##name =
PRESERVED_FLAGS_##name; })
+#define PRESERVE_FLAG(name) PRESERVE_VAR(FLAGS_##name, FLAGS_##name)
namespace pegasus {
diff --git a/src/utils/autoref_ptr.h b/src/utils/autoref_ptr.h
index 501698f85..c3ba9d959 100644
--- a/src/utils/autoref_ptr.h
+++ b/src/utils/autoref_ptr.h
@@ -160,6 +160,15 @@ public:
T *operator->() const { return _obj; }
+ bool operator==(const ref_ptr<T> &r) const { return _obj == r._obj; }
+
+ template <typename U,
+ typename = typename std::enable_if<std::is_convertible<U *, T
*>::value>::type>
+ bool operator==(const ref_ptr<U> &r) const
+ {
+ return _obj == r._obj;
+ }
+
bool operator==(T *r) const { return _obj == r; }
bool operator!=(T *r) const { return _obj != r; }
diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h
index b971522d0..9f23d24bb 100644
--- a/src/utils/command_manager.h
+++ b/src/utils/command_manager.h
@@ -36,6 +36,7 @@
#include <map>
#include <memory>
#include <string>
+#include <type_traits>
#include <vector>
#include "utils/fmt_logging.h"
@@ -65,14 +66,12 @@ public:
// 'validator' is used to validate the new value.
// The value is reset to 'default_value' if passing "DEFAULT" argument.
template <typename T>
- WARN_UNUSED_RESULT std::unique_ptr<command_deregister>
register_int_command(
- T &value,
- T default_value,
- const std::string &command,
- const std::string &help,
- std::function<bool(int64_t new_value)> validator = [](int64_t
new_value) -> bool {
- return new_value >= 0;
- })
+ WARN_UNUSED_RESULT std::unique_ptr<command_deregister>
+ register_int_command(T &value,
+ T default_value,
+ const std::string &command,
+ const std::string &help,
+ std::function<bool(typename
std::remove_reference<T>::type)> validator)
{
return register_single_command(
command,
@@ -83,6 +82,19 @@ public:
});
}
+ template <typename T>
+ WARN_UNUSED_RESULT std::unique_ptr<command_deregister>
register_int_command(
+ T &value, T default_value, const std::string &command, const
std::string &help)
+ {
+ return register_int_command(value,
+ default_value,
+ command,
+ help,
+ [](typename std::remove_reference<T>::type
new_value) -> bool {
+ return new_value >= 0;
+ });
+ }
+
// Register a single 'command' with the 'help' description, its arguments
are described in
// 'args'.
std::unique_ptr<command_deregister>
@@ -133,11 +145,12 @@ private:
set_bool(bool &value, const std::string &name, const
std::vector<std::string> &args);
template <typename T>
- static std::string set_int(T &value,
- T default_value,
- const std::string &name,
- const std::vector<std::string> &args,
- const std::function<bool(int64_t value)>
&validator)
+ static std::string
+ set_int(T &value,
+ T default_value,
+ const std::string &name,
+ const std::vector<std::string> &args,
+ const std::function<bool(typename std::remove_reference<T>::type)>
&validator)
{
nlohmann::json msg;
msg["error"] = "ok";
@@ -164,8 +177,7 @@ private:
// Invalid argument.
T new_value = 0;
- if (!internal::buf2signed(args[0], new_value) ||
- !validator(static_cast<int64_t>(new_value))) {
+ if (!buf2numeric(args[0], new_value) || !validator(new_value)) {
msg["error"] =
fmt::format("ERR: invalid argument '{}', the value is not
acceptable", args[0]);
return msg.dump(2);
diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h
index d36aa9864..872ea4fb7 100644
--- a/src/utils/time_utils.h
+++ b/src/utils/time_utils.h
@@ -147,7 +147,7 @@ public:
inline void reset_start_time() { _start_time_ns = dsn_now_ns(); }
- inline uint64_t duration_ns() const
+ [[nodiscard]] inline uint64_t duration_ns() const
{
auto now = dsn_now_ns();
CHECK_GE(now, _start_time_ns);
@@ -155,6 +155,8 @@ public:
return now - _start_time_ns;
}
+ [[nodiscard]] inline uint64_t duration_ms() const { return duration_ns() /
1'000'000; }
+
private:
uint64_t _start_time_ns;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]