This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d711b08d1 perf: enhance the loading process of replicas particularly 
when a significant number of replicas are spread across multiple disks (#2078)
d711b08d1 is described below

commit d711b08d1a15f7823fcea0c82bdbc6d45e4bf3ce
Author: Dan Wang <[email protected]>
AuthorDate: Thu Dec 26 14:35:30 2024 +0800

    perf: enhance the loading process of replicas particularly when a 
significant number of replicas are spread across multiple disks (#2078)
    
    Immediately after the replica server is started, all of the replicas under 
the data
    directory would be loaded. Currently, a loading task is launched for each 
replica
    directory. The tasks for loading replica directories are pushed into a 
partitioned
    thread pool (namely `THREAD_POOL_REPLICATION`) disk by disk: only after all
    of the replica directories on the current disk have been pushed into the 
thread
    pool would the process move on to the next disk. Since the thread pool is 
partitioned
    while the hash for each task is the total number of the tasks that have 
been in the
    pool before this task is added, all of the replica directories on one disk 
would be
    executed concurrently. This would lead to two problems once there are a 
great number
    of replica directories on each disk:
    
    - I/O usage for each disk might become saturated: its `%util` might become 
100%;
    - The entire loading process is blocked on each single disk: during a long 
period only
    one disk is keeping busy while others are idle.
    
    The replica server seems getting stuck in loading replicas after it is 
started. This is
    unacceptable and should be changed.
    
    The improved version allows the replica directories on different disks to 
be loaded
    simultaneously: every disk would be busy loading replicas. Also, loading 
tasks would
    be pushed into a non-partitioned thread pool (i.e. `THREAD_POOL_LOCAL_APP`)
    instead of the partitioned, making tasks across multiple threads 
auto-balanced to
    prevent some threads from being starved while others are stuffed.
    
    And new parameter is added to restrict the max number of replicas allowed 
to be loaded
    simultaneously for each disk, in case that I/O usage for each disk becomes 
saturated.
    Another parameter is added to ensure that the main thread waiting all 
loading tasks to
    finished would not be blocked on one task too long while the number of 
tasks for loading
    replica directories simultaneously on a single disk has reached its limit.
    
    Parameters are added as follows:
    
    ```diff
    [replication]
    + max_replicas_on_load_for_each_disk = 256
    + load_replica_max_wait_time_ms = 10
    ```
---
 .clang-tidy                             |   2 +-
 build_tools/clang_tidy.py               |   4 +-
 src/common/replication.codes.h          |   2 +-
 src/meta/meta_service.cpp               |   1 +
 src/replica/replica.h                   |   1 +
 src/replica/replica_stub.cpp            | 681 +++++++++++++++++++++++---------
 src/replica/replica_stub.h              | 105 +++--
 src/replica/test/config-test.ini        |   8 +-
 src/replica/test/load_replicas_test.cpp | 339 ++++++++++++++++
 src/replica/test/replica_dir_test.cpp   | 127 ++++++
 src/server/result_writer.cpp            |   1 -
 src/test_util/test_util.h               |   9 +-
 src/utils/autoref_ptr.h                 |   9 +
 src/utils/command_manager.h             |  42 +-
 src/utils/time_utils.h                  |   4 +-
 15 files changed, 1100 insertions(+), 235 deletions(-)

diff --git a/.clang-tidy b/.clang-tidy
index 914dee119..95dd7616d 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -20,7 +20,7 @@
 CheckOptions: []
 # Disable some checks that are not useful for us now.
 # They are sorted by names, and should be consistent to 
build_tools/clang_tidy.py.
-Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines
 [...]
+Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines
 [...]
 ExtraArgs:
 ExtraArgsBefore: []
 FormatStyle: none
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index 09ea434b1..8a35e1a5d 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -88,6 +88,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
                            "-misc-definitions-in-headers,"
                            "-misc-non-private-member-variables-in-classes,"
                            "-misc-unused-parameters,"
+                           "-modernize-avoid-bind,"
                            "-modernize-avoid-c-arrays,"
                            "-modernize-replace-disallow-copy-and-assign-macro,"
                            "-modernize-use-trailing-return-type,"
@@ -95,7 +96,8 @@ def run_tidy(sha="HEAD", is_rev_range=False):
                            "-readability-function-cognitive-complexity,"
                            "-readability-identifier-length,"
                            "-readability-magic-numbers,"
-                           "-readability-named-parameter",
+                           "-readability-named-parameter,"
+                           "-readability-suspicious-call-argument",
                    "-extra-arg=-language=c++",
                    "-extra-arg=-std=c++17",
                    "-extra-arg=-Ithirdparty/output/include"]
diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h
index 7bbb775b1..42ae695f7 100644
--- a/src/common/replication.codes.h
+++ b/src/common/replication.codes.h
@@ -141,7 +141,6 @@ MAKE_EVENT_CODE(LPC_META_STATE_NORMAL, TASK_PRIORITY_COMMON)
 
 // THREAD_POOL_REPLICATION
 #define CURRENT_THREAD_POOL THREAD_POOL_REPLICATION
-MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE(RPC_REPLICATION_WRITE_EMPTY, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE(LPC_PER_REPLICA_CHECKPOINT_TIMER, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE(LPC_PER_REPLICA_COLLECT_INFO_TIMER, TASK_PRIORITY_COMMON)
@@ -186,6 +185,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH)
 
 // THREAD_POOL_LOCAL_APP
 #define CURRENT_THREAD_POOL THREAD_POOL_LOCAL_APP
+MAKE_EVENT_CODE(LPC_REPLICATION_INIT_LOAD, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE(LPC_WRITE, TASK_PRIORITY_COMMON)
 MAKE_EVENT_CODE(LPC_read_THROTTLING_DELAY, TASK_PRIORITY_COMMON)
 #undef CURRENT_THREAD_POOL
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index bd9736304..6ec19c007 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -29,6 +29,7 @@
 #include <boost/lexical_cast.hpp>
 #include <algorithm> // for std::remove_if
 #include <chrono>
+#include <cstdint>
 #include <functional>
 #include <ostream>
 #include <string_view>
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 4ee215c3d..b312865c5 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -610,6 +610,7 @@ private:
     friend class replica_disk_test;
     friend class replica_disk_migrate_test;
     friend class open_replica_test;
+    friend class mock_load_replica;
     friend class replica_follower;
     friend class ::pegasus::server::pegasus_server_test_base;
     friend class ::pegasus::server::rocksdb_wrapper_test;
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 2e4eec66e..471b8b1f8 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -29,15 +29,15 @@
 // IWYU pragma: no_include <ext/alloc_traits.h>
 #include <fmt/core.h>
 #include <fmt/format.h>
+#include <nlohmann/json.hpp>
 #include <rapidjson/ostreamwrapper.h>
-#include <stdio.h>
-#include <stdlib.h>
 #include <algorithm>
 #include <chrono>
 #include <cstdint>
-#include <deque>
+#include <cstdlib>
 #include <iterator>
 #include <mutex>
+#include <queue>
 #include <set>
 #include <sstream>
 #include <string_view>
@@ -71,6 +71,10 @@
 #include "security/access_controller.h"
 #include "split/replica_split_manager.h"
 #include "task/async_calls.h"
+#include "task/task.h"
+#include "task/task_engine.h"
+#include "task/task_worker.h"
+#include "utils/api_utilities.h"
 #include "utils/command_manager.h"
 #include "utils/env.h"
 #include "utils/errors.h"
@@ -80,8 +84,11 @@
 #include "utils/ports.h"
 #include "utils/process_utils.h"
 #include "utils/rand.h"
+#include "utils/string_conv.h"
 #include "utils/strings.h"
 #include "utils/synchronize.h"
+#include "utils/threadpool_spec.h"
+#include "utils/timer.h"
 #ifdef DSN_ENABLE_GPERF
 #include <gperftools/malloc_extension.h>
 #elif defined(DSN_USE_JEMALLOC)
@@ -91,14 +98,17 @@
 #include "remote_cmd/remote_command.h"
 #include "utils/fail_point.h"
 
-static const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
-    "The maximum concurrent bulk load downloading replica count";
-DSN_DEFINE_int32(replication,
-                 max_concurrent_bulk_load_downloading_count,
-                 5,
-                 kMaxConcurrentBulkLoadDownloadingCountDesc);
-DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count,
-                     [](int32_t value) -> bool { return value >= 0; });
+namespace {
+
+const char *kMaxReplicasOnLoadForEachDiskDesc =
+    "The max number of replicas that are allowed to be loaded simultaneously 
for each disk dir.";
+
+const char *kLoadReplicaMaxWaitTimeMsDesc = "The max waiting time for replica 
loading to complete.";
+
+const char *kMaxConcurrentBulkLoadDownloadingCountDesc =
+    "The maximum concurrent bulk load downloading replica count.";
+
+} // anonymous namespace
 
 METRIC_DEFINE_gauge_int64(server,
                           total_replicas,
@@ -260,6 +270,26 @@ DSN_DECLARE_string(data_dirs);
 DSN_DECLARE_string(encryption_cluster_key_name);
 DSN_DECLARE_string(server_key);
 
+DSN_DEFINE_uint64(replication,
+                  max_replicas_on_load_for_each_disk,
+                  256,
+                  kMaxReplicasOnLoadForEachDiskDesc);
+DSN_TAG_VARIABLE(max_replicas_on_load_for_each_disk, FT_MUTABLE);
+DSN_DEFINE_validator(max_replicas_on_load_for_each_disk,
+                     [](uint64_t value) -> bool { return value > 0; });
+
+DSN_DEFINE_uint64(replication, load_replica_max_wait_time_ms, 10, 
kLoadReplicaMaxWaitTimeMsDesc);
+DSN_TAG_VARIABLE(load_replica_max_wait_time_ms, FT_MUTABLE);
+DSN_DEFINE_validator(load_replica_max_wait_time_ms,
+                     [](uint64_t value) -> bool { return value > 0; });
+
+DSN_DEFINE_int32(replication,
+                 max_concurrent_bulk_load_downloading_count,
+                 5,
+                 kMaxConcurrentBulkLoadDownloadingCountDesc);
+DSN_DEFINE_validator(max_concurrent_bulk_load_downloading_count,
+                     [](int32_t value) -> bool { return value >= 0; });
+
 DSN_DEFINE_bool(replication,
                 deny_client_on_start,
                 false,
@@ -376,9 +406,52 @@ namespace dsn {
 namespace replication {
 bool replica_stub::s_not_exit_on_log_failure = false;
 
+namespace {
+
+// Register commands that get/set flag configurations.
+void register_flags_ctrl_command()
+{
+    // For the reaonse why using std::call_once please see comments in
+    // replica_stub::register_ctrl_command() for details.
+    static std::once_flag flag;
+    std::call_once(flag, []() mutable {
+        dsn::command_manager::instance().add_global_cmd(
+            dsn::command_manager::instance().register_int_command(
+                FLAGS_max_replicas_on_load_for_each_disk,
+                FLAGS_max_replicas_on_load_for_each_disk,
+                "replica.max-replicas-on-load-for-each-disk",
+                kMaxReplicasOnLoadForEachDiskDesc));
+
+        dsn::command_manager::instance().add_global_cmd(
+            dsn::command_manager::instance().register_int_command(
+                FLAGS_load_replica_max_wait_time_ms,
+                FLAGS_load_replica_max_wait_time_ms,
+                "replica.load-replica-max-wait-time-ms",
+                kLoadReplicaMaxWaitTimeMsDesc));
+
+        dsn::command_manager::instance().add_global_cmd(
+            dsn::command_manager::instance().register_bool_command(
+                FLAGS_empty_write_disabled,
+                "replica.disable-empty-write",
+                "whether to disable empty writes"));
+
+        dsn::command_manager::instance().add_global_cmd(
+            dsn::command_manager::instance().register_int_command(
+                FLAGS_max_concurrent_bulk_load_downloading_count,
+                FLAGS_max_concurrent_bulk_load_downloading_count,
+                "replica.max-concurrent-bulk-load-downloading-count",
+                kMaxConcurrentBulkLoadDownloadingCountDesc));
+    });
+}
+
+} // anonymous namespace
+
 replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
                            bool is_long_subscriber /* = true*/)
     : serverlet("replica_stub"),
+      _state(NS_Disconnected),
+      _replica_state_subscriber(std::move(subscriber)),
+      _is_long_subscriber(is_long_subscriber),
       _deny_client(false),
       _verbose_client_log(false),
       _verbose_commit_log(false),
@@ -388,6 +461,9 @@ replica_stub::replica_stub(replica_state_subscriber 
subscriber /*= nullptr*/,
       _bulk_load_downloading_count(0),
       _manual_emergency_checkpointing_count(0),
       _is_running(false),
+#ifdef DSN_ENABLE_GPERF
+      _is_releasing_memory(false),
+#endif
       METRIC_VAR_INIT_server(total_replicas),
       METRIC_VAR_INIT_server(opening_replicas),
       METRIC_VAR_INIT_server(closing_replicas),
@@ -420,16 +496,13 @@ replica_stub::replica_stub(replica_state_subscriber 
subscriber /*= nullptr*/,
       METRIC_VAR_INIT_server(splitting_replicas_async_learn_max_duration_ms),
       METRIC_VAR_INIT_server(splitting_replicas_max_copy_file_bytes)
 {
-#ifdef DSN_ENABLE_GPERF
-    _is_releasing_memory = false;
-#endif
-    _replica_state_subscriber = subscriber;
-    _is_long_subscriber = is_long_subscriber;
-    _failure_detector = nullptr;
-    _state = NS_Disconnected;
+    // Some flags might need to be tuned on the stage of loading replicas 
(during
+    // replica_stub::initialize()), thus register their control command just 
in the
+    // constructor.
+    register_flags_ctrl_command();
 }
 
-replica_stub::~replica_stub(void) { close(); }
+replica_stub::~replica_stub() { close(); }
 
 void replica_stub::initialize(bool clear /* = false*/)
 {
@@ -439,6 +512,246 @@ void replica_stub::initialize(bool clear /* = false*/)
     _access_controller = std::make_unique<dsn::security::access_controller>();
 }
 
+std::vector<replica_stub::disk_replicas_info> 
replica_stub::get_all_disk_dirs() const
+{
+    std::vector<disk_replicas_info> disks;
+    for (const auto &disk_node : _fs_manager.get_dir_nodes()) {
+        if (dsn_unlikely(disk_node->status == disk_status::IO_ERROR)) {
+            // Skip disks with IO errors.
+            continue;
+        }
+
+        std::vector<std::string> sub_dirs;
+        CHECK(utils::filesystem::get_subdirectories(disk_node->full_dir, 
sub_dirs, false),
+              "failed to get sub_directories in {}",
+              disk_node->full_dir);
+        disks.push_back(disk_replicas_info{disk_node.get(), 
std::move(sub_dirs)});
+    }
+
+    return disks;
+}
+
+// TaskCode: LPC_REPLICATION_INIT_LOAD
+// ThreadPool: THREAD_POOL_LOCAL_APP
+void replica_stub::load_replica(dir_node *disk_node,
+                                const std::string &replica_dir,
+                                size_t total_dir_count,
+                                utils::ex_lock &reps_lock,
+                                replica_map_by_gpid &reps,
+                                std::atomic<size_t> &finished_dir_count)
+{
+    // Measure execution time for loading a replica dir.
+    //
+    // TODO(wangdan): support decimal milliseconds or microseconds, since 
loading a small
+    // replica tends to spend less than 1 milliseconds and show "0ms" in 
logging.
+    SCOPED_LOG_TIMING(INFO, "on loading replica dir {}:{}", disk_node->tag, 
replica_dir);
+
+    LOG_INFO("loading replica: replica_dir={}:{}", disk_node->tag, 
replica_dir);
+
+    const auto *const worker = task::get_current_worker2();
+    if (worker != nullptr) {
+        CHECK(!(worker->pool()->spec().partitioned),
+              "The thread pool THREAD_POOL_LOCAL_APP(task code: 
LPC_REPLICATION_INIT_LOAD) "
+              "for loading replicas must not be partitioned since load 
balancing is required "
+              "among multiple threads");
+    }
+
+    auto rep = load_replica(disk_node, replica_dir);
+    if (rep == nullptr) {
+        LOG_INFO("load replica failed: replica_dir={}:{}, progress={}/{}",
+                 disk_node->tag,
+                 replica_dir,
+                 ++finished_dir_count,
+                 total_dir_count);
+        return;
+    }
+
+    LOG_INFO("{}@{}: load replica successfully, replica_dir={}:{}, 
progress={}/{}, "
+             "last_durable_decree={}, last_committed_decree={}, 
last_prepared_decree={}",
+             rep->get_gpid(),
+             dsn_primary_host_port(),
+             disk_node->tag,
+             replica_dir,
+             ++finished_dir_count,
+             total_dir_count,
+             rep->last_durable_decree(),
+             rep->last_committed_decree(),
+             rep->last_prepared_decree());
+
+    utils::auto_lock<utils::ex_lock> l(reps_lock);
+    const auto rep_iter = reps.find(rep->get_gpid());
+    CHECK(rep_iter == reps.end(),
+          "{}@{}: newly loaded dir {} conflicts with existing {} while loading 
replica",
+          rep->get_gpid(),
+          dsn_primary_host_port(),
+          rep->dir(),
+          rep_iter->second->dir());
+
+    reps.emplace(rep->get_gpid(), rep);
+}
+
+void replica_stub::load_replicas(replica_map_by_gpid &reps)
+{
+    // Measure execution time for loading all replicas from all healthy disks 
without IO errors.
+    //
+    // TODO(wangdan): show both the size of output replicas and execution time 
on just one
+    // logging line.
+    SCOPED_LOG_TIMING(INFO, "on loading replicas");
+
+    const auto &disks = get_all_disk_dirs();
+
+    // The max index of dirs that are currently being loaded for each disk, 
which means the dirs
+    // with higher indexes have not begun to be loaded (namely pushed into the 
queue).
+    std::vector<size_t> replica_dir_indexes(disks.size(), 0);
+
+    // Each loader is for a replica dir, including its path and loading task.
+    struct replica_dir_loader
+    {
+        size_t replica_dir_index;
+        std::string replica_dir_path;
+        task_ptr load_replica_task;
+    };
+
+    // Each queue would cache the tasks that loading dirs for each disk. Once 
the task is
+    // found finished (namely a dir has been loaded successfully), it would be 
popped from
+    // the queue.
+    std::vector<std::queue<replica_dir_loader>> load_disk_queues(disks.size());
+
+    // The number of loading replica dirs that have been finished for each 
disk, used to show
+    // current progress.
+    //
+    // TODO(wangdan): calculate the number of successful or failed loading of 
replica dirs,
+    // and the number for each reason if failed.
+    std::vector<std::atomic<size_t>> finished_replica_dirs(disks.size());
+    for (auto &count : finished_replica_dirs) {
+        count.store(0);
+    }
+
+    // The lock for operations on the loaded replicas as output.
+    utils::ex_lock reps_lock;
+
+    while (true) {
+        size_t finished_disks = 0;
+
+        // For each round, start loading one replica for each disk in case 
there are too many
+        // replicas in a disk, except that all of the replicas of this disk 
are being loaded.
+        for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) {
+            // TODO(wangdan): Structured bindings can be captured by closures 
in g++, while
+            // not supported well by clang. Thus we do not use following 
statement to bind
+            // both variables until clang has been upgraded to version 16 
which could support
+            // that well:
+            //
+            //     const auto &[disk_node, replica_dirs] = disks[disk_index];
+            //
+            // For the docs of clang 16 please see:
+            //
+            // 
https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support.
+            const auto &replica_dirs = disks[disk_index].replica_dirs;
+
+            auto &replica_dir_index = replica_dir_indexes[disk_index];
+            if (replica_dir_index >= replica_dirs.size()) {
+                // All of the replicas for the disk `disks[disk_index]` have 
begun to be loaded,
+                // thus just skip to next disk.
+                ++finished_disks;
+                continue;
+            }
+
+            const auto &disk_node = disks[disk_index].disk_node;
+            auto &load_disk_queue = load_disk_queues[disk_index];
+            if (load_disk_queue.size() >= 
FLAGS_max_replicas_on_load_for_each_disk) {
+                // Loading replicas should be throttled in case that disk IO 
is saturated.
+                if (!load_disk_queue.front().load_replica_task->wait(
+                        
static_cast<int>(FLAGS_load_replica_max_wait_time_ms))) {
+                    // There might be too many replicas that are being loaded 
which lead to
+                    // slow disk IO, thus turn to load replicas of next disk, 
and try to load
+                    // dir `replica_dir_index` of this disk in the next round.
+                    LOG_WARNING("after {} ms, loading dir({}, {}/{}) is still 
not finished, "
+                                "there are {} replicas being loaded for 
disk({}:{}, {}/{}), "
+                                "now turn to next disk, and will begin to load 
dir({}, {}/{}) "
+                                "soon",
+                                FLAGS_load_replica_max_wait_time_ms,
+                                load_disk_queue.front().replica_dir_path,
+                                load_disk_queue.front().replica_dir_index,
+                                replica_dirs.size(),
+                                load_disk_queue.size(),
+                                disk_node->tag,
+                                disk_node->full_dir,
+                                disk_index,
+                                disks.size(),
+                                replica_dirs[replica_dir_index],
+                                replica_dir_index,
+                                replica_dirs.size());
+                    continue;
+                }
+
+                // Now the queue size is within the limit again, continue to 
load a new replica dir.
+                load_disk_queue.pop();
+            }
+
+            if 
(dsn::replication::is_data_dir_invalid(replica_dirs[replica_dir_index])) {
+                LOG_WARNING("ignore dir({}, {}/{}) for disk({}:{}, {}/{})",
+                            replica_dirs[replica_dir_index],
+                            replica_dir_index,
+                            replica_dirs.size(),
+                            disk_node->tag,
+                            disk_node->full_dir,
+                            disk_index,
+                            disks.size());
+                ++replica_dir_index;
+                continue;
+            }
+
+            LOG_DEBUG("ready to load dir({}, {}/{}) for disk({}:{}, {}/{})",
+                      replica_dirs[replica_dir_index],
+                      replica_dir_index,
+                      replica_dirs.size(),
+                      disk_node->tag,
+                      disk_node->full_dir,
+                      disk_index,
+                      disks.size());
+
+            load_disk_queue.push(replica_dir_loader{
+                replica_dir_index,
+                replica_dirs[replica_dir_index],
+                tasking::create_task(
+                    // Ensure that the thread pool is non-partitioned.
+                    LPC_REPLICATION_INIT_LOAD,
+                    &_tracker,
+                    std::bind(static_cast<void (replica_stub::*)(dir_node *,
+                                                                 const 
std::string &,
+                                                                 size_t,
+                                                                 
utils::ex_lock &,
+                                                                 
replica_map_by_gpid &,
+                                                                 
std::atomic<size_t> &)>(
+                                  &replica_stub::load_replica),
+                              this,
+                              disk_node,
+                              replica_dirs[replica_dir_index],
+                              replica_dirs.size(),
+                              std::ref(reps_lock),
+                              std::ref(reps),
+                              std::ref(finished_replica_dirs[disk_index])))});
+
+            load_disk_queue.back().load_replica_task->enqueue();
+
+            ++replica_dir_index;
+        }
+
+        if (finished_disks >= disks.size()) {
+            // All replicas of all disks have begun to be loaded.
+            break;
+        }
+    }
+
+    // All loading tasks have been in the queue. Just wait all tasks to be 
finished.
+    for (auto &load_disk_queue : load_disk_queues) {
+        while (!load_disk_queue.empty()) {
+            CHECK_TRUE(load_disk_queue.front().load_replica_task->wait());
+            load_disk_queue.pop();
+        }
+    }
+}
+
 void replica_stub::initialize(const replication_options &opts, bool clear /* = 
false*/)
 {
     _primary_host_port = dsn_primary_host_port();
@@ -523,75 +836,14 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
 
     // Start to load replicas in available data directories.
     LOG_INFO("start to load replicas");
-    std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
-    for (const auto &dn : _fs_manager.get_dir_nodes()) {
-        // Skip IO error dir_node.
-        if (dsn_unlikely(dn->status == disk_status::IO_ERROR)) {
-            continue;
-        }
-        std::vector<std::string> sub_directories;
-        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, 
sub_directories, false),
-              "fail to get sub_directories in {}",
-              dn->full_dir);
-        dirs_by_dn.emplace(dn.get(), sub_directories);
-    }
-
-    replicas rps;
-    utils::ex_lock rps_lock;
-    std::deque<task_ptr> load_tasks;
-    uint64_t start_time = dsn_now_ms();
-    for (const auto &dn_dirs : dirs_by_dn) {
-        const auto dn = dn_dirs.first;
-        for (const auto &dir : dn_dirs.second) {
-            if (dsn::replication::is_data_dir_invalid(dir)) {
-                LOG_WARNING("ignore dir {}", dir);
-                continue;
-            }
 
-            load_tasks.push_back(tasking::create_task(
-                LPC_REPLICATION_INIT_LOAD,
-                &_tracker,
-                [this, dn, dir, &rps, &rps_lock] {
-                    LOG_INFO("process dir {}", dir);
-
-                    auto r = load_replica(dn, dir.c_str());
-                    if (r == nullptr) {
-                        return;
-                    }
-                    LOG_INFO("{}@{}: load replica '{}' success, <durable, "
-                             "commit> = <{}, {}>, last_prepared_decree = {}",
-                             r->get_gpid(),
-                             dsn_primary_host_port(),
-                             dir,
-                             r->last_durable_decree(),
-                             r->last_committed_decree(),
-                             r->last_prepared_decree());
-
-                    utils::auto_lock<utils::ex_lock> l(rps_lock);
-                    CHECK(rps.find(r->get_gpid()) == rps.end(),
-                          "conflict replica dir: {} <--> {}",
-                          r->dir(),
-                          rps[r->get_gpid()]->dir());
-
-                    rps[r->get_gpid()] = r;
-                },
-                load_tasks.size()));
-            load_tasks.back()->enqueue();
-        }
-    }
-    for (auto &tsk : load_tasks) {
-        tsk->wait();
-    }
-    uint64_t finish_time = dsn_now_ms();
+    replica_map_by_gpid reps;
+    load_replicas(reps);
 
-    dirs_by_dn.clear();
-    load_tasks.clear();
-    LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms",
-             rps.size(),
-             finish_time - start_time);
+    LOG_INFO("load replicas succeed, replica_count = {}", reps.size());
 
     bool is_log_complete = true;
-    for (auto it = rps.begin(); it != rps.end(); ++it) {
+    for (auto it = reps.begin(); it != reps.end(); ++it) {
         CHECK_EQ_MSG(it->second->background_sync_checkpoint(), ERR_OK, "sync 
checkpoint failed");
 
         it->second->reset_prepare_list_after_replay();
@@ -621,8 +873,8 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
     if (!is_log_complete) {
         LOG_ERROR("logs are not complete for some replicas, which means that 
shared log is "
                   "truncated, mark all replicas as inactive");
-        for (auto it = rps.begin(); it != rps.end(); ++it) {
-            it->second->set_inactive_state_transient(false);
+        for (auto &[_, rep] : reps) {
+            rep->set_inactive_state_transient(false);
         }
     }
 
@@ -648,11 +900,11 @@ void replica_stub::initialize(const replication_options 
&opts, bool clear /* = f
             std::chrono::seconds(FLAGS_disk_stat_interval_seconds));
     }
 
-    // attach rps
-    _replicas = std::move(rps);
+    // Attach `reps`.
+    _replicas = std::move(reps);
     METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size());
-    for (const auto &kv : _replicas) {
-        _fs_manager.add_replica(kv.first, kv.second->dir());
+    for (const auto &[pid, rep] : _replicas) {
+        _fs_manager.add_replica(pid, rep->dir());
     }
 
     _nfs = dsn::nfs_node::create();
@@ -746,7 +998,7 @@ dsn::error_code replica_stub::on_kill_replica(gpid id)
 {
     LOG_INFO("kill replica: gpid = {}", id);
     if (id.get_app_id() == -1 || id.get_partition_index() == -1) {
-        replicas rs;
+        replica_map_by_gpid rs;
         {
             zauto_read_lock l(_replicas_lock);
             rs = _replicas;
@@ -1373,27 +1625,28 @@ void replica_stub::on_node_query_reply(error_code err,
                  resp.partitions.size(),
                  resp.gc_replicas.size());
 
-        replicas rs;
+        replica_map_by_gpid reps;
         {
             zauto_read_lock rl(_replicas_lock);
-            rs = _replicas;
+            reps = _replicas;
         }
 
-        for (auto it = resp.partitions.begin(); it != resp.partitions.end(); 
++it) {
-            rs.erase(it->config.pid);
-            tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER,
-                             &_tracker,
-                             
std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it),
-                             it->config.pid.thread_hash());
+        for (const auto &config_update : resp.partitions) {
+            reps.erase(config_update.config.pid);
+            tasking::enqueue(
+                LPC_QUERY_NODE_CONFIGURATION_SCATTER,
+                &_tracker,
+                std::bind(&replica_stub::on_node_query_reply_scatter, this, 
this, config_update),
+                config_update.config.pid.thread_hash());
         }
 
-        // for rps not exist on meta_servers
-        for (auto it = rs.begin(); it != rs.end(); ++it) {
+        // For the replicas that do not exist on meta_servers.
+        for (const auto &[pid, _] : reps) {
             tasking::enqueue(
                 LPC_QUERY_NODE_CONFIGURATION_SCATTER2,
                 &_tracker,
-                std::bind(&replica_stub::on_node_query_reply_scatter2, this, 
this, it->first),
-                it->first.thread_hash());
+                std::bind(&replica_stub::on_node_query_reply_scatter2, this, 
this, pid),
+                pid.thread_hash());
         }
 
         // handle the replicas which need to be gc
@@ -1522,18 +1775,18 @@ void replica_stub::on_meta_server_disconnected()
 
     _state = NS_Disconnected;
 
-    replicas rs;
+    replica_map_by_gpid reps;
     {
         zauto_read_lock rl(_replicas_lock);
-        rs = _replicas;
+        reps = _replicas;
     }
 
-    for (auto it = rs.begin(); it != rs.end(); ++it) {
+    for (const auto &[pid, _] : reps) {
         tasking::enqueue(
             LPC_CM_DISCONNECTED_SCATTER,
             &_tracker,
-            std::bind(&replica_stub::on_meta_server_disconnected_scatter, 
this, this, it->first),
-            it->first.thread_hash());
+            std::bind(&replica_stub::on_meta_server_disconnected_scatter, 
this, this, pid),
+            pid.thread_hash());
     }
 }
 
@@ -1823,7 +2076,7 @@ void replica_stub::open_replica(
                  _primary_host_port_cache,
                  group_check ? "with" : "without",
                  dir);
-        rep = load_replica(dn, dir.c_str());
+        rep = load_replica(dn, dir);
 
         // if load data failed, re-open the `*.ori` folder which is the origin 
replica dir of disk
         // migration
@@ -1848,7 +2101,7 @@ void replica_stub::open_replica(
                 boost::replace_first(
                     origin_dir, 
replica_disk_migrator::kReplicaDirOriginSuffix, "");
                 dsn::utils::filesystem::rename_path(origin_tmp_dir, 
origin_dir);
-                rep = load_replica(origin_dn, origin_dir.c_str());
+                rep = load_replica(origin_dn, origin_dir);
 
                 FAIL_POINT_INJECT_F("mock_replica_load", [&](std::string_view) 
-> void {});
             }
@@ -1915,7 +2168,7 @@ void replica_stub::open_replica(
         METRIC_VAR_DECREMENT(opening_replicas);
 
         CHECK(_replicas.find(id) == _replicas.end(), "replica {} is already in 
_replicas", id);
-        _replicas.insert(replicas::value_type(rep->get_gpid(), rep));
+        _replicas.insert(replica_map_by_gpid::value_type(rep->get_gpid(), 
rep));
         METRIC_VAR_INCREMENT(total_replicas);
 
         _closed_replicas.erase(id);
@@ -1981,6 +2234,51 @@ replica *replica_stub::new_replica(gpid gpid,
     return rep;
 }
 
+replica *replica_stub::new_replica(gpid gpid,
+                                   const app_info &app,
+                                   bool restore_if_necessary,
+                                   bool is_duplication_follower)
+{
+    return new_replica(gpid, app, restore_if_necessary, 
is_duplication_follower, {});
+}
+
+/*static*/ std::string replica_stub::get_replica_dir_name(const std::string 
&dir)
+{
+    static const char splitters[] = {'\\', '/', 0};
+    return utils::get_last_component(dir, splitters);
+}
+
+/* static */ bool
+replica_stub::parse_replica_dir_name(const std::string &dir_name, gpid &pid, 
std::string &app_type)
+{
+    std::vector<uint32_t> ids(2, 0);
+    size_t begin = 0;
+    for (auto &id : ids) {
+        size_t end = dir_name.find('.', begin);
+        if (end == std::string::npos) {
+            return false;
+        }
+
+        if (!buf2uint32(std::string_view(dir_name.data() + begin, end - 
begin), id)) {
+            return false;
+        }
+
+        begin = end + 1;
+    }
+
+    if (begin >= dir_name.size()) {
+        return false;
+    }
+
+    pid.set_app_id(static_cast<int32_t>(ids[0]));
+    pid.set_partition_index(static_cast<int32_t>(ids[1]));
+
+    // TODO(wangdan): the 3rd parameter `count` does not support default 
argument for CentOS 7
+    // (gcc 7.3.1). After CentOS 7 is deprecated, consider dropping 
std::string::npos.
+    app_type.assign(dir_name, begin, std::string::npos);
+    return true;
+}
+
 bool replica_stub::validate_replica_dir(const std::string &dir,
                                         app_info &ai,
                                         gpid &pid,
@@ -1991,21 +2289,18 @@ bool replica_stub::validate_replica_dir(const 
std::string &dir,
         return false;
     }
 
-    char splitters[] = {'\\', '/', 0};
-    const auto name = utils::get_last_component(dir, splitters);
-    if (name.empty()) {
+    const auto &dir_name = get_replica_dir_name(dir);
+    if (dir_name.empty()) {
         hint_message = fmt::format("invalid replica dir '{}'", dir);
         return false;
     }
 
-    char app_type[128] = {0};
-    int32_t app_id, pidx;
-    if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
+    std::string app_type;
+    if (!parse_replica_dir_name(dir_name, pid, app_type)) {
         hint_message = fmt::format("invalid replica dir '{}'", dir);
         return false;
     }
 
-    pid = gpid(app_id, pidx);
     replica_app_info rai(&ai);
     const auto ai_path = utils::filesystem::path_combine(dir, 
replica_app_info::kAppInfo);
     const auto err = rai.load(ai_path);
@@ -2019,10 +2314,9 @@ bool replica_stub::validate_replica_dir(const 
std::string &dir,
         return false;
     }
 
-    // When the online partition split function aborted, the garbage 
partitions are with pidx in
-    // the range of [ai.partition_count, 2 * ai.partition_count), which means 
the partitions with
-    // pidx >= ai.partition_count are garbage partitions.
-    if (ai.partition_count <= pidx) {
+    if (pid.get_partition_index() >= ai.partition_count) {
+        // Once the online partition split aborted, the partitions within the 
range of
+        // [ai.partition_count, 2 * ai.partition_count) would become garbage.
         hint_message = fmt::format(
             "partition[{}], count={}, this replica may be partition split 
garbage partition, "
             "ignore it",
@@ -2034,7 +2328,7 @@ bool replica_stub::validate_replica_dir(const std::string 
&dir,
     return true;
 }
 
-replica *replica_stub::load_replica(dir_node *dn, const char *dir)
+replica_ptr replica_stub::load_replica(dir_node *disk_node, const std::string 
&replica_dir)
 {
     FAIL_POINT_INJECT_F("mock_replica_load",
                         [&](std::string_view) -> replica * { return nullptr; 
});
@@ -2042,24 +2336,28 @@ replica *replica_stub::load_replica(dir_node *dn, const 
char *dir)
     app_info ai;
     gpid pid;
     std::string hint_message;
-    if (!validate_replica_dir(dir, ai, pid, hint_message)) {
-        LOG_ERROR("invalid replica dir '{}', hint: {}", dir, hint_message);
+    if (!validate_replica_dir(replica_dir, ai, pid, hint_message)) {
+        LOG_ERROR("invalid replica dir '{}', hint={}", replica_dir, 
hint_message);
         return nullptr;
     }
 
     // The replica's directory must exist when creating a replica.
-    CHECK_EQ(dir, dn->replica_dir(ai.app_type, pid));
-    auto *rep = new replica(this, pid, ai, dn, false);
+    CHECK_EQ(disk_node->replica_dir(ai.app_type, pid), replica_dir);
+
+    auto *rep = new replica(this, pid, ai, disk_node, false);
     const auto err = rep->initialize_on_load();
     if (err != ERR_OK) {
-        LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
-        rep->close();
+        LOG_ERROR("{}: load replica failed, tag={}, replica_dir={}, err={}",
+                  rep->name(),
+                  disk_node->tag,
+                  replica_dir,
+                  err);
         delete rep;
         rep = nullptr;
 
         // clear work on failure
-        if (dsn::utils::filesystem::directory_exists(dir)) {
-            move_to_err_path(dir, "load replica");
+        if (dsn::utils::filesystem::directory_exists(replica_dir)) {
+            move_to_err_path(replica_dir, "load replica");
             METRIC_VAR_INCREMENT(moved_error_replicas);
             _fs_manager.remove_replica(pid);
         }
@@ -2067,7 +2365,10 @@ replica *replica_stub::load_replica(dir_node *dn, const 
char *dir)
         return nullptr;
     }
 
-    LOG_INFO("{}: load replica succeed", rep->name());
+    LOG_INFO("{}: load replica succeed, tag={}, replica_dir={}",
+             rep->name(),
+             disk_node->tag,
+             replica_dir);
     return rep;
 }
 
@@ -2394,11 +2695,6 @@ void replica_stub::register_ctrl_command()
                 });
             }));
 
-        
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
-            FLAGS_empty_write_disabled,
-            "replica.disable-empty-write",
-            "whether to disable empty writes"));
-
 #ifdef DSN_ENABLE_GPERF
         
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
             _release_tcmalloc_memory,
@@ -2433,61 +2729,64 @@ void replica_stub::register_ctrl_command()
 #elif defined(DSN_USE_JEMALLOC)
         register_jemalloc_ctrl_command();
 #endif
-        
_cmds.emplace_back(::dsn::command_manager::instance().register_int_command(
-            FLAGS_max_concurrent_bulk_load_downloading_count,
-            FLAGS_max_concurrent_bulk_load_downloading_count,
-            "replica.max-concurrent-bulk-load-downloading-count",
-            kMaxConcurrentBulkLoadDownloadingCountDesc));
     });
 }
 
 std::string
-replica_stub::exec_command_on_replica(const std::vector<std::string> &args,
+replica_stub::exec_command_on_replica(const std::vector<std::string> 
&arg_str_list,
                                       bool allow_empty_args,
-                                      std::function<std::string(const 
replica_ptr &rep)> func)
+                                      std::function<std::string(const 
replica_ptr &)> func)
 {
-    if (args.empty() && !allow_empty_args) {
-        return std::string("invalid arguments");
+    static const std::string kInvalidArguments("invalid arguments");
+
+    if (arg_str_list.empty() && !allow_empty_args) {
+        return kInvalidArguments;
     }
 
-    replicas rs;
+    replica_map_by_gpid rs;
     {
         zauto_read_lock l(_replicas_lock);
         rs = _replicas;
     }
 
     std::set<gpid> required_ids;
-    replicas choosed_rs;
-    if (!args.empty()) {
-        for (int i = 0; i < args.size(); i++) {
-            std::vector<std::string> arg_strs;
-            utils::split_args(args[i].c_str(), arg_strs, ',');
-            if (arg_strs.empty()) {
-                return std::string("invalid arguments");
+    replica_map_by_gpid choosed_rs;
+    if (!arg_str_list.empty()) {
+        for (const auto &arg_str : arg_str_list) {
+            std::vector<std::string> args;
+            utils::split_args(arg_str.c_str(), args, ',');
+            if (args.empty()) {
+                return kInvalidArguments;
             }
 
-            for (const std::string &arg : arg_strs) {
-                if (arg.empty())
+            for (const std::string &arg : args) {
+                if (arg.empty()) {
                     continue;
+                }
+
                 gpid id;
-                int pid;
                 if (id.parse_from(arg.c_str())) {
-                    // app_id.partition_index
+                    // Format: app_id.partition_index
                     required_ids.insert(id);
                     auto find = rs.find(id);
                     if (find != rs.end()) {
                         choosed_rs[id] = find->second;
                     }
-                } else if (sscanf(arg.c_str(), "%d", &pid) == 1) {
-                    // app_id
-                    for (auto kv : rs) {
-                        id = kv.second->get_gpid();
-                        if (id.get_app_id() == pid) {
-                            choosed_rs[id] = kv.second;
-                        }
+
+                    continue;
+                }
+
+                // Must be app_id.
+                int32_t app_id = 0;
+                if (!buf2int32(arg, app_id)) {
+                    return kInvalidArguments;
+                }
+
+                for (const auto &[_, rep] : rs) {
+                    id = rep->get_gpid();
+                    if (id.get_app_id() == app_id) {
+                        choosed_rs[id] = rep;
                     }
-                } else {
-                    return std::string("invalid arguments");
                 }
             }
         }
@@ -2507,8 +2806,10 @@ replica_stub::exec_command_on_replica(const 
std::vector<std::string> &args,
             [rep, &func, &results_lock, &results]() {
                 partition_status::type status = rep->status();
                 if (status != partition_status::PS_PRIMARY &&
-                    status != partition_status::PS_SECONDARY)
+                    status != partition_status::PS_SECONDARY) {
                     return;
+                }
+
                 std::string result = func(rep);
                 ::dsn::zauto_lock l(results_lock);
                 auto &value = results[rep->get_gpid()];
@@ -2720,33 +3021,39 @@ replica_ptr 
replica_stub::create_child_replica_if_not_found(gpid child_pid,
             CHECK_NOTNULL(dn, "");
             auto *rep = new replica(this, child_pid, *app, dn, false);
             rep->_config.status = partition_status::PS_INACTIVE;
-            _replicas.insert(replicas::value_type(child_pid, rep));
+            _replicas.insert(replica_map_by_gpid::value_type(child_pid, rep));
             LOG_INFO("mock create_child_replica_if_not_found succeed");
             return rep;
         });
 
     zauto_write_lock l(_replicas_lock);
-    auto it = _replicas.find(child_pid);
+
+    const auto it = _replicas.find(child_pid);
     if (it != _replicas.end()) {
         return it->second;
-    } else {
-        if (_opening_replicas.find(child_pid) != _opening_replicas.end()) {
-            LOG_WARNING("failed create child replica({}) because it is under 
open", child_pid);
-            return nullptr;
-        } else if (_closing_replicas.find(child_pid) != 
_closing_replicas.end()) {
-            LOG_WARNING("failed create child replica({}) because it is under 
close", child_pid);
-            return nullptr;
-        } else {
-            replica *rep = new_replica(child_pid, *app, false, false, 
parent_dir);
-            if (rep != nullptr) {
-                auto pr = _replicas.insert(replicas::value_type(child_pid, 
rep));
-                CHECK(pr.second, "child replica {} has been existed", 
rep->name());
-                METRIC_VAR_INCREMENT(total_replicas);
-                _closed_replicas.erase(child_pid);
-            }
-            return rep;
-        }
     }
+
+    if (_opening_replicas.find(child_pid) != _opening_replicas.end()) {
+        LOG_WARNING("failed create child replica({}) because it is under 
open", child_pid);
+        return nullptr;
+    }
+
+    if (_closing_replicas.find(child_pid) != _closing_replicas.end()) {
+        LOG_WARNING("failed create child replica({}) because it is under 
close", child_pid);
+        return nullptr;
+    }
+
+    replica *rep = new_replica(child_pid, *app, false, false, parent_dir);
+    if (rep == nullptr) {
+        return nullptr;
+    }
+
+    const auto pr = 
_replicas.insert(replica_map_by_gpid::value_type(child_pid, rep));
+    CHECK(pr.second, "child replica {} has been existed", rep->name());
+    METRIC_VAR_INCREMENT(total_replicas);
+    _closed_replicas.erase(child_pid);
+
+    return rep;
 }
 
 // ThreadPool: THREAD_POOL_REPLICATION
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index d3b860076..7d03d06ee 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -27,8 +27,9 @@
 #pragma once
 
 #include <gtest/gtest_prod.h>
-#include <stdint.h>
 #include <atomic>
+#include <cstddef>
+#include <cstdint>
 #include <functional>
 #include <map>
 #include <memory>
@@ -70,6 +71,12 @@
 #include "utils/metrics.h"
 #include "utils/zlocks.h"
 
+namespace dsn::utils {
+
+class ex_lock;
+
+} // namespace dsn::utils
+
 DSN_DECLARE_uint32(max_concurrent_manual_emergency_checkpointing_count);
 
 namespace dsn {
@@ -112,14 +119,14 @@ typedef rpc_holder<add_new_disk_request, 
add_new_disk_response> add_new_disk_rpc
 
 namespace test {
 class test_checker;
-}
+} // namespace test
+
 class cold_backup_context;
 class replica_split_manager;
-typedef std::function<void(const ::dsn::host_port & /*from*/,
-                           const replica_configuration & /*new_config*/,
-                           bool /*is_closing*/)>
-    replica_state_subscriber;
-typedef std::unordered_map<gpid, replica_ptr> replicas;
+
+using replica_state_subscriber = std::function<void(const ::dsn::host_port & 
/*from*/,
+                                                    const 
replica_configuration & /*new_config*/,
+                                                    bool /*is_closing*/)>;
 
 class replica_stub;
 
@@ -222,9 +229,9 @@ public:
     //   - if allow_empty_args = false, you should specify at least one 
argument.
     // each argument should be in format of:
     //     id1,id2... (where id is 'app_id' or 'app_id.partition_id')
-    std::string exec_command_on_replica(const std::vector<std::string> &args,
+    std::string exec_command_on_replica(const std::vector<std::string> 
&arg_str_list,
                                         bool allow_empty_args,
-                                        std::function<std::string(const 
replica_ptr &rep)> func);
+                                        std::function<std::string(const 
replica_ptr &)> func);
 
     //
     // partition split
@@ -351,15 +358,61 @@ private:
                       gpid id,
                       const std::shared_ptr<group_check_request> &req,
                       const std::shared_ptr<configuration_update_request> 
&req2);
-    // Create a new replica according to the parameters.
-    // 'parent_dir' is used in partition split for create_child_replica_dir().
+
+    // Create a child replica for partition split, with 'parent_dir' specified 
as the parent
+    // replica dir used for `create_child_replica_dir()`.
     replica *new_replica(gpid gpid,
                          const app_info &app,
                          bool restore_if_necessary,
                          bool is_duplication_follower,
-                         const std::string &parent_dir = "");
-    // Load an existing replica which is located in 'dn' with 'dir' directory.
-    replica *load_replica(dir_node *dn, const char *dir);
+                         const std::string &parent_dir);
+
+    // Create a new replica, choosing and assigning the best dir for it.
+    replica *new_replica(gpid gpid,
+                         const app_info &app,
+                         bool restore_if_necessary,
+                         bool is_duplication_follower);
+
+    // Each disk with its candidate replica dirs, used to load replicas while 
initializing.
+    struct disk_replicas_info
+    {
+        // `dir_node` for each disk.
+        dir_node *disk_node;
+
+        // All replica dirs on each disk.
+        std::vector<std::string> replica_dirs;
+    };
+
+    // Get the absolute dirs of all replicas for all healthy disks without IO 
errors.
+    std::vector<disk_replicas_info> get_all_disk_dirs() const;
+
+    // Get the replica dir name from a potentially longer path (`dir` could be 
an absolute
+    // or relative path).
+    static std::string get_replica_dir_name(const std::string &dir);
+
+    // Parse app id, partition id and app type from the replica dir name.
+    static bool
+    parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string 
&app_type);
+
+    // Load an existing replica which is located in `dn` with `replica_dir`. 
Usually each
+    // different `dn` represents a unique disk. `replica_dir` is the absolute 
path of the
+    // directory for a replica.
+    virtual replica_ptr load_replica(dir_node *disk_node, const std::string 
&replica_dir);
+
+    using replica_map_by_gpid = std::unordered_map<gpid, replica_ptr>;
+
+    // The same as the above `load_replica` function, except that this 
function is to load
+    // each replica to `reps` with protection from `reps_lock`.
+    void load_replica(dir_node *disk_node,
+                      const std::string &replica_dir,
+                      size_t total_dir_count,
+                      utils::ex_lock &reps_lock,
+                      replica_map_by_gpid &reps,
+                      std::atomic<size_t> &finished_dir_count);
+
+    // Load all replicas simultaneously from all disks to `reps`.
+    void load_replicas(replica_map_by_gpid &reps);
+
     // Clean up the memory state and on disk data if creating replica failed.
     void clear_on_failure(replica *rep);
     task_ptr begin_close_replica(replica_ptr r);
@@ -445,21 +498,26 @@ private:
     friend class replica_follower;
     friend class replica_follower_test;
     friend class replica_http_service_test;
+    friend class mock_load_replica;
+    friend class GetReplicaDirNameTest;
+    friend class ParseReplicaDirNameTest;
     FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
     FRIEND_TEST(replica_test, test_auto_trash_of_corruption);
     FRIEND_TEST(replica_test, test_clear_on_failure);
 
-    typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
-    typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, 
app_info, replica_info>>
-        closing_replicas; // <gpid, <close_task, replica, app_info, 
replica_info> >
-    typedef std::map<gpid, std::pair<app_info, replica_info>>
-        closed_replicas; // <gpid, <app_info, replica_info> >
+    using opening_replica_map_by_gpid = std::unordered_map<gpid, task_ptr>;
+
+    // `task_ptr` is the task closing a replica.
+    using closing_replica_map_by_gpid =
+        std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, 
replica_info>>;
+
+    using closed_replica_map_by_gpid = std::map<gpid, std::pair<app_info, 
replica_info>>;
 
     mutable zrwlock_nr _replicas_lock;
-    replicas _replicas;
-    opening_replicas _opening_replicas;
-    closing_replicas _closing_replicas;
-    closed_replicas _closed_replicas;
+    replica_map_by_gpid _replicas;
+    opening_replica_map_by_gpid _opening_replicas;
+    closing_replica_map_by_gpid _closing_replicas;
+    closed_replica_map_by_gpid _closed_replicas;
 
     ::dsn::host_port _primary_host_port;
     // The stringify of '_primary_host_port', used by logging usually.
@@ -565,5 +623,6 @@ private:
 
     dsn::task_tracker _tracker;
 };
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/replica/test/config-test.ini b/src/replica/test/config-test.ini
index 2d2c6f397..b2e0e86c2 100644
--- a/src/replica/test/config-test.ini
+++ b/src/replica/test/config-test.ini
@@ -34,7 +34,7 @@ type = replica
 run = true
 count = 1
 ports = 54321
-pools = 
THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE
+pools = 
THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_LOCAL_APP,THREAD_POOL_REPLICATION,THREAD_POOL_PLOG,THREAD_POOL_BLOCK_SERVICE
 
 [core]
 ;tool = simulator
@@ -76,6 +76,12 @@ partitioned = true
 worker_priority = THREAD_xPRIORITY_NORMAL
 worker_count = 3
 
+[threadpool.THREAD_POOL_LOCAL_APP]
+name = local_app
+partitioned = false
+worker_priority = THREAD_xPRIORITY_NORMAL
+worker_count = 4
+
 [threadpool.THREAD_POOL_REPLICATION_LONG]
 name = replica_long
 
diff --git a/src/replica/test/load_replicas_test.cpp 
b/src/replica/test/load_replicas_test.cpp
new file mode 100644
index 000000000..3315629c9
--- /dev/null
+++ b/src/replica/test/load_replicas_test.cpp
@@ -0,0 +1,339 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstddef>
+#include <cstdint>
+#include <fmt/core.h>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <set>
+#include <string>
+#include <type_traits>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "common/fs_manager.h"
+#include "common/gpid.h"
+#include "common/replication.codes.h"
+#include "dsn.layer2_types.h"
+#include "gtest/gtest.h"
+#include "replica/replica.h"
+#include "replica/replica_stub.h"
+#include "replica/replication_app_base.h"
+#include "replica/test/mock_utils.h"
+#include "task/task.h"
+#include "task/task_code.h"
+#include "task/task_spec.h"
+#include "test_util/test_util.h"
+#include "utils/autoref_ptr.h"
+#include "utils/filesystem.h"
+#include "utils/flags.h"
+#include "utils/ports.h"
+
+DSN_DECLARE_uint64(max_replicas_on_load_for_each_disk);
+
+namespace dsn::replication {
+
+class mock_load_replica : public replica_stub
+{
+public:
+    mock_load_replica() = default;
+
+    ~mock_load_replica() override = default;
+
+    void initialize(const std::map<std::string, std::string> &dirs_by_tag,
+                    const std::map<std::string, std::vector<gpid>> 
&replicas_by_tag)
+    {
+        // Get dirs and tags to initialize fs_manager.
+        std::vector<std::string> dirs;
+        std::vector<std::string> tags;
+        for (const auto &[tag, dir] : dirs_by_tag) {
+            dirs.push_back(dir);
+            tags.push_back(tag);
+        }
+
+        // Initialize fs_manager by the tag and dir of each disk.
+        _fs_manager.initialize(dirs, tags);
+
+        // Generate the replicas which are expected after loading.
+        for (const auto &[tag, reps] : replicas_by_tag) {
+            for (const auto &pid : reps) {
+                ASSERT_TRUE(_expected_loaded_replica_pids.insert(pid).second);
+            }
+        }
+
+        _disk_tags_for_order.clear();
+        _disk_dirs_for_order.clear();
+        _disk_replicas_for_order.clear();
+        _disk_loaded_replicas_for_order.assign(replicas_by_tag.size(), 0);
+
+        // Ensure that the disks are scanned in the order returned by 
`get_dir_node()`.
+        for (const auto &dn : _fs_manager.get_dir_nodes()) {
+            for (const auto &pid : replicas_by_tag.at(dn->tag)) {
+                _fs_manager.specify_dir_for_new_replica_for_test(dn.get(), 
"pegasus", pid);
+            }
+
+            _disk_tags_for_order.push_back(dn->tag);
+            _disk_dirs_for_order.push_back(dn->full_dir);
+            
_disk_replicas_for_order.push_back(replicas_by_tag.at(dn->tag).size());
+        }
+
+        ASSERT_EQ(_disk_tags_for_order.size(), _disk_dirs_for_order.size());
+    }
+
+    void test_load_replicas(bool test_load_order, uint64_t 
max_replicas_on_load_for_each_disk)
+    {
+        PRESERVE_VAR(allow_inline, 
dsn::task_spec::get(LPC_REPLICATION_INIT_LOAD)->allow_inline);
+        dsn::task_spec::get(LPC_REPLICATION_INIT_LOAD)->allow_inline = 
test_load_order;
+
+        PRESERVE_FLAG(max_replicas_on_load_for_each_disk);
+        FLAGS_max_replicas_on_load_for_each_disk = 
max_replicas_on_load_for_each_disk;
+
+        // Check if all loaded replicas are matched.
+        replica_stub::replica_map_by_gpid actual_loaded_replicas;
+        load_replicas(actual_loaded_replicas);
+        ASSERT_EQ(_expected_loaded_replicas, actual_loaded_replicas);
+
+        // Check if all replicas have been loaded.
+        std::set<gpid> actual_loaded_replica_pids;
+        for (const auto &[pid, _] : actual_loaded_replicas) {
+            ASSERT_TRUE(actual_loaded_replica_pids.insert(pid).second);
+        }
+        ASSERT_EQ(_expected_loaded_replica_pids, actual_loaded_replica_pids);
+    }
+
+    void remove_disk_dirs()
+    {
+        for (const auto &dn : _fs_manager.get_dir_nodes()) {
+            ASSERT_TRUE(utils::filesystem::remove_path(dn->full_dir));
+        }
+    }
+
+private:
+    void load_replica_for_test(dir_node *dn, const std::string &replica_dir, 
replica_ptr &rep)
+    {
+        ASSERT_TRUE(utils::filesystem::directory_exists(replica_dir));
+
+        const auto &dir_name = get_replica_dir_name(replica_dir);
+
+        gpid pid;
+        std::string app_type;
+        ASSERT_TRUE(parse_replica_dir_name(dir_name, pid, app_type));
+        ASSERT_STREQ("pegasus", app_type.c_str());
+
+        ASSERT_EQ(LPC_REPLICATION_INIT_LOAD, 
task::get_current_task()->spec().code);
+
+        if (task::get_current_task()->spec().allow_inline) {
+            // Once the task is `allow_inline`, it would be executed in place 
immediately rather
+            // than pushed into the queue. Thus we could test the expected 
order in which the
+            // tasks are pushed into the queue.
+
+            // Find the first disk where there is still some replica that has 
not been loaded.
+            size_t finished_disks = 0;
+            while (_disk_loaded_replicas_for_order[_disk_index_for_order] >=
+                   _disk_replicas_for_order[_disk_index_for_order]) {
+                // Since current task has not been executed, it is not 
possible that all disks
+                // are finished.
+                ++finished_disks;
+                ASSERT_GT(_disk_tags_for_order.size(), finished_disks);
+
+                // Skip to next disk since all of the replicas of this disk 
have been loaded.
+                _disk_index_for_order = (_disk_index_for_order + 1) % 
_disk_tags_for_order.size();
+            }
+
+            // Only check if the processed order of the disk the replica 
belongs to, rather than
+            // the order of the replica itself, for the reason that the order 
of the dirs returned
+            // by the underlying call might vary.
+            ASSERT_EQ(_disk_tags_for_order[_disk_index_for_order], dn->tag);
+            ASSERT_EQ(_disk_dirs_for_order[_disk_index_for_order], 
dn->full_dir);
+
+            // Current replica has been loaded, move forward to the next 
replica of this disk.
+            ++_disk_loaded_replicas_for_order[_disk_index_for_order];
+
+            // Turn to next disks if some of them still have some replicas 
that are not loaded.
+            _disk_index_for_order = (_disk_index_for_order + 1) % 
_disk_tags_for_order.size();
+        }
+
+        // Check the absolute dir of this replica.
+        ASSERT_EQ(dn->replica_dir("pegasus", pid), replica_dir);
+
+        app_info ai;
+        ai.app_type = "pegasus";
+        rep = new replica(this, pid, ai, dn, false);
+        rep->_app = 
std::make_unique<replication::mock_replication_app_base>(rep);
+
+        std::lock_guard<std::mutex> guard(_mtx);
+
+        ASSERT_TRUE(_expected_loaded_replicas.find(pid) == 
_expected_loaded_replicas.end());
+
+        _expected_loaded_replicas[pid] = rep;
+    }
+
+    // Mock the process of loading a replica.
+    replica_ptr load_replica(dir_node *dn, const std::string &replica_dir) 
override
+    {
+        replica_ptr rep;
+        load_replica_for_test(dn, replica_dir, rep);
+        return rep;
+    }
+
+    std::set<gpid> _expected_loaded_replica_pids;
+
+    // The variables with postfix `_for_order` are only for testing the order 
of the loading
+    // tasks.
+    size_t _disk_index_for_order{0};
+    std::vector<std::string> _disk_tags_for_order;
+    std::vector<std::string> _disk_dirs_for_order;
+    std::vector<size_t> _disk_replicas_for_order;
+    std::vector<size_t> _disk_loaded_replicas_for_order;
+
+    mutable std::mutex _mtx;
+    replica_stub::replica_map_by_gpid _expected_loaded_replicas;
+
+    DISALLOW_COPY_AND_ASSIGN(mock_load_replica);
+    DISALLOW_MOVE_AND_ASSIGN(mock_load_replica);
+};
+
+struct load_replicas_case
+{
+    // Each disk tag => dir of this disk.
+    std::map<std::string, std::string> dirs_by_tag;
+
+    // Each disk tag => replicas (specified by <app_id, partition_id>) on this 
disk.
+    std::map<std::string, std::vector<gpid>> replicas_by_tag;
+};
+
+class LoadReplicasTest : public testing::TestWithParam<load_replicas_case>
+{
+public:
+    LoadReplicasTest()
+    {
+        // Remove all dirs of all disks to prevent each test from being 
disturbed.
+        _stub.remove_disk_dirs();
+
+        // Use test cases to initialize the replica stub.
+        const auto &load_case = GetParam();
+        _stub.initialize(load_case.dirs_by_tag, load_case.replicas_by_tag);
+    }
+
+    ~LoadReplicasTest() override { _stub.remove_disk_dirs(); }
+
+    void test_load_replicas(bool test_load_order, uint64_t 
max_replicas_on_load_for_each_disk)
+    {
+        _stub.test_load_replicas(test_load_order, 
max_replicas_on_load_for_each_disk);
+    }
+
+private:
+    mock_load_replica _stub;
+
+    DISALLOW_COPY_AND_ASSIGN(LoadReplicasTest);
+    DISALLOW_MOVE_AND_ASSIGN(LoadReplicasTest);
+};
+
+TEST_P(LoadReplicasTest, LoadReplicas) { test_load_replicas(false, 256); }
+
+TEST_P(LoadReplicasTest, LoadOrder) { test_load_replicas(true, 256); }
+
+TEST_P(LoadReplicasTest, LoadThrottling) { test_load_replicas(false, 5); }
+
+// Generate a test case for loading replicas. Each element in `disk_replicas` 
is corresponding
+// to the number of replicas on a disk.
+load_replicas_case generate_load_replicas_case(const std::vector<size_t> 
&disk_replicas)
+{
+    std::map<std::string, std::string> dirs_by_tag;
+    for (size_t disk_index = 0; disk_index < disk_replicas.size(); 
++disk_index) {
+        dirs_by_tag.emplace(fmt::format("data{}", disk_index), 
fmt::format("disk{}", disk_index));
+    }
+
+    static const uint32_t kNumBitsPartitions = 3;
+    static const uint32_t kNumPartitions = 1U << kNumBitsPartitions;
+
+    uint32_t app_id = 1;
+    uint32_t partition_id = 0;
+
+    std::map<std::string, std::vector<gpid>> replicas_by_tag;
+
+    while (true) {
+        size_t finished_disks = 0;
+
+        for (size_t disk_index = 0; disk_index < disk_replicas.size(); 
++disk_index) {
+            auto &replica_list = replicas_by_tag[fmt::format("data{}", 
disk_index)];
+            if (replica_list.size() >= disk_replicas[disk_index]) {
+                // All replicas on this disk have been generated, just skip to 
next disk.
+                ++finished_disks;
+                continue;
+            }
+
+            // Generate a replica with current app id and partition index.
+            replica_list.emplace_back(static_cast<int32_t>(app_id),
+                                      static_cast<int32_t>(partition_id));
+
+            // Once next partition index is found 0, increment app id to turn 
to next table.
+            app_id += ((partition_id + 1) & kNumPartitions) >> 
kNumBitsPartitions;
+
+            // Increment index to turn to next partition.
+            partition_id = (partition_id + 1) & (kNumPartitions - 1);
+        }
+
+        if (finished_disks >= disk_replicas.size()) {
+            // All disks have been done.
+            break;
+        }
+    }
+
+    return {dirs_by_tag, replicas_by_tag};
+}
+
+std::vector<load_replicas_case> generate_load_replicas_cases()
+{
+    // At least 1 disk should be included (otherwise it would lead to core 
dump), thus do
+    // not generate the empty case (i.e. {}).
+    return std::vector<load_replicas_case>({
+        // There is only one disk which has none of replica.
+        generate_load_replicas_case({0}),
+        // There are two disks both of which have none of replica.
+        generate_load_replicas_case({0, 0}),
+        // There is only one disk which has one replica.
+        generate_load_replicas_case({1}),
+        // There are two disks one of which has one replica, and another has 
none.
+        generate_load_replicas_case({1, 0}),
+        generate_load_replicas_case({0, 1}),
+        // There is only one disk which has two replicas.
+        generate_load_replicas_case({2}),
+        // There are two disks one of which has two replicas, and another has 
none.
+        generate_load_replicas_case({2, 0}),
+        generate_load_replicas_case({0, 2}),
+        // There are at least three disks.
+        generate_load_replicas_case({1, 0, 2}),
+        generate_load_replicas_case({8, 25, 16}),
+        generate_load_replicas_case({17, 96, 56, 127}),
+        generate_load_replicas_case({22, 38, 0, 16}),
+        generate_load_replicas_case({82, 75, 36, 118, 65}),
+        generate_load_replicas_case({0, 92, 17, 68, 25}),
+        // There are many replicas for some disks.
+        generate_load_replicas_case({156, 367, 309, 58, 404, 298, 512, 82}),
+        generate_load_replicas_case({167, 28, 898, 516, 389, 422, 682, 265, 
596}),
+    });
+}
+
+INSTANTIATE_TEST_SUITE_P(ReplicaStubTest,
+                         LoadReplicasTest,
+                         testing::ValuesIn(generate_load_replicas_cases()));
+
+} // namespace dsn::replication
diff --git a/src/replica/test/replica_dir_test.cpp 
b/src/replica/test/replica_dir_test.cpp
new file mode 100644
index 000000000..3bc64d74f
--- /dev/null
+++ b/src/replica/test/replica_dir_test.cpp
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include "common/gpid.h"
+#include "gtest/gtest.h"
+#include "replica/replica_stub.h"
+
+namespace dsn::replication {
+
+struct get_replica_dir_name_case
+{
+    std::string path;
+    std::string expected_replica_dir_name;
+};
+
+class GetReplicaDirNameTest : public 
testing::TestWithParam<get_replica_dir_name_case>
+{
+public:
+    static void test_get_replica_dir_name()
+    {
+        const auto &test_case = GetParam();
+        const auto &actual_replica_dir_name = 
replica_stub::get_replica_dir_name(test_case.path);
+        EXPECT_EQ(test_case.expected_replica_dir_name, 
actual_replica_dir_name);
+    }
+};
+
+TEST_P(GetReplicaDirNameTest, GetReplicaDirName) { 
test_get_replica_dir_name(); }
+
+const std::vector<get_replica_dir_name_case> get_replica_dir_name_tests{
+    // Linux absolute path and non-empty dir name.
+    {"/data/pegasus/1.2.pegasus", "1.2.pegasus"},
+    // Linux absolute path and empty dir name.
+    {"/data/pegasus/1.2.pegasus/", ""},
+    // Windows absolute path and non-empty dir name.
+    {R"(D:\data\pegasus\1.2.pegasus)", "1.2.pegasus"},
+    // Windows absolute path and empty dir name.
+    {R"(D:\data\pegasus\1.2.pegasus\)", ""},
+    // Linux relative path and non-empty dir name.
+    {"./1.2.pegasus", "1.2.pegasus"},
+    // Linux relative path and empty dir name.
+    {"./1.2.pegasus/", ""},
+    // Windows relative path and non-empty dir name.
+    {R"(.\1.2.pegasus)", "1.2.pegasus"},
+    // Windows relative path and empty dir name.
+    {R"(.\1.2.pegasus\)", ""},
+};
+
+INSTANTIATE_TEST_SUITE_P(ReplicaDirTest,
+                         GetReplicaDirNameTest,
+                         testing::ValuesIn(get_replica_dir_name_tests));
+
+struct parse_replica_dir_name_case
+{
+    std::string replica_dir_name;
+    bool ok;
+    gpid expected_pid;
+    std::string expected_app_type;
+};
+
+class ParseReplicaDirNameTest : public 
testing::TestWithParam<parse_replica_dir_name_case>
+{
+public:
+    static void test_parse_replica_dir_name()
+    {
+        const auto &test_case = GetParam();
+
+        gpid actual_pid;
+        std::string actual_app_type;
+        ASSERT_EQ(test_case.ok,
+                  replica_stub::parse_replica_dir_name(
+                      test_case.replica_dir_name, actual_pid, 
actual_app_type));
+        if (!test_case.ok) {
+            return;
+        }
+
+        EXPECT_EQ(test_case.expected_pid, actual_pid);
+        EXPECT_EQ(test_case.expected_app_type, actual_app_type);
+    }
+};
+
+TEST_P(ParseReplicaDirNameTest, ParseReplicaDirName) { 
test_parse_replica_dir_name(); }
+
+const std::vector<parse_replica_dir_name_case> parse_replica_dir_name_tests{
+    // Empty dir name.
+    {"", false, {}, ""},
+    // Single-digit IDs.
+    {"1.2.pegasus", true, {1, 2}, "pegasus"},
+    // Multi-digit IDs.
+    {"1234.56789.pegasus", true, {1234, 56789}, "pegasus"},
+    // Custom app type other than "pegasus".
+    {"1.2.another", true, {1, 2}, "another"},
+    // Custom app type with dot.
+    {"1.2.another.pegasus", true, {1, 2}, "another.pegasus"},
+    // Custom app type with other specific symbol.
+    {"1.2.another_pegasus", true, {1, 2}, "another_pegasus"},
+    // Missing one ID.
+    {"1.pegasus", false, {}, ""},
+    // Missing both IDs.
+    {"pegasus", false, {}, ""},
+    // ID with letter.
+    {"1.2a.pegasus", false, {}, ""},
+    // ID with minus.
+    {"1.-2.pegasus", false, {}, ""},
+};
+
+INSTANTIATE_TEST_SUITE_P(ReplicaDirTest,
+                         ParseReplicaDirNameTest,
+                         testing::ValuesIn(parse_replica_dir_name_tests));
+
+} // namespace dsn::replication
diff --git a/src/server/result_writer.cpp b/src/server/result_writer.cpp
index df78172e9..4d3a9160e 100644
--- a/src/server/result_writer.cpp
+++ b/src/server/result_writer.cpp
@@ -21,7 +21,6 @@
 
 #include <pegasus/error.h>
 #include <chrono>
-#include <type_traits>
 #include <utility>
 
 #include "pegasus/client.h"
diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h
index 2e2b34bb0..549d8fd94 100644
--- a/src/test_util/test_util.h
+++ b/src/test_util/test_util.h
@@ -44,11 +44,12 @@ class file_meta;
 } // namespace replication
 } // namespace dsn
 
+#define PRESERVE_VAR(name, expr)                                               
                    \
+    const auto PRESERVED_##name = expr;                                        
                    \
+    auto PRESERVED_##name##_cleanup = dsn::defer([PRESERVED_##name]() { expr = 
PRESERVED_##name; })
+
 // Save the current value of a flag and restore it at the end of the function.
-#define PRESERVE_FLAG(name)                                                    
                    \
-    const auto PRESERVED_FLAGS_##name = FLAGS_##name;                          
                    \
-    auto PRESERVED_FLAGS_##name##_cleanup =                                    
                    \
-        dsn::defer([PRESERVED_FLAGS_##name]() { FLAGS_##name = 
PRESERVED_FLAGS_##name; })
+#define PRESERVE_FLAG(name) PRESERVE_VAR(FLAGS_##name, FLAGS_##name)
 
 namespace pegasus {
 
diff --git a/src/utils/autoref_ptr.h b/src/utils/autoref_ptr.h
index 501698f85..c3ba9d959 100644
--- a/src/utils/autoref_ptr.h
+++ b/src/utils/autoref_ptr.h
@@ -160,6 +160,15 @@ public:
 
     T *operator->() const { return _obj; }
 
+    bool operator==(const ref_ptr<T> &r) const { return _obj == r._obj; }
+
+    template <typename U,
+              typename = typename std::enable_if<std::is_convertible<U *, T 
*>::value>::type>
+    bool operator==(const ref_ptr<U> &r) const
+    {
+        return _obj == r._obj;
+    }
+
     bool operator==(T *r) const { return _obj == r; }
 
     bool operator!=(T *r) const { return _obj != r; }
diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h
index b971522d0..9f23d24bb 100644
--- a/src/utils/command_manager.h
+++ b/src/utils/command_manager.h
@@ -36,6 +36,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <vector>
 
 #include "utils/fmt_logging.h"
@@ -65,14 +66,12 @@ public:
     // 'validator' is used to validate the new value.
     // The value is reset to 'default_value' if passing "DEFAULT" argument.
     template <typename T>
-    WARN_UNUSED_RESULT std::unique_ptr<command_deregister> 
register_int_command(
-        T &value,
-        T default_value,
-        const std::string &command,
-        const std::string &help,
-        std::function<bool(int64_t new_value)> validator = [](int64_t 
new_value) -> bool {
-            return new_value >= 0;
-        })
+    WARN_UNUSED_RESULT std::unique_ptr<command_deregister>
+    register_int_command(T &value,
+                         T default_value,
+                         const std::string &command,
+                         const std::string &help,
+                         std::function<bool(typename 
std::remove_reference<T>::type)> validator)
     {
         return register_single_command(
             command,
@@ -83,6 +82,19 @@ public:
             });
     }
 
+    template <typename T>
+    WARN_UNUSED_RESULT std::unique_ptr<command_deregister> 
register_int_command(
+        T &value, T default_value, const std::string &command, const 
std::string &help)
+    {
+        return register_int_command(value,
+                                    default_value,
+                                    command,
+                                    help,
+                                    [](typename std::remove_reference<T>::type 
new_value) -> bool {
+                                        return new_value >= 0;
+                                    });
+    }
+
     // Register a single 'command' with the 'help' description, its arguments 
are described in
     // 'args'.
     std::unique_ptr<command_deregister>
@@ -133,11 +145,12 @@ private:
     set_bool(bool &value, const std::string &name, const 
std::vector<std::string> &args);
 
     template <typename T>
-    static std::string set_int(T &value,
-                               T default_value,
-                               const std::string &name,
-                               const std::vector<std::string> &args,
-                               const std::function<bool(int64_t value)> 
&validator)
+    static std::string
+    set_int(T &value,
+            T default_value,
+            const std::string &name,
+            const std::vector<std::string> &args,
+            const std::function<bool(typename std::remove_reference<T>::type)> 
&validator)
     {
         nlohmann::json msg;
         msg["error"] = "ok";
@@ -164,8 +177,7 @@ private:
 
         // Invalid argument.
         T new_value = 0;
-        if (!internal::buf2signed(args[0], new_value) ||
-            !validator(static_cast<int64_t>(new_value))) {
+        if (!buf2numeric(args[0], new_value) || !validator(new_value)) {
             msg["error"] =
                 fmt::format("ERR: invalid argument '{}', the value is not 
acceptable", args[0]);
             return msg.dump(2);
diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h
index d36aa9864..872ea4fb7 100644
--- a/src/utils/time_utils.h
+++ b/src/utils/time_utils.h
@@ -147,7 +147,7 @@ public:
 
     inline void reset_start_time() { _start_time_ns = dsn_now_ns(); }
 
-    inline uint64_t duration_ns() const
+    [[nodiscard]] inline uint64_t duration_ns() const
     {
         auto now = dsn_now_ns();
         CHECK_GE(now, _start_time_ns);
@@ -155,6 +155,8 @@ public:
         return now - _start_time_ns;
     }
 
+    [[nodiscard]] inline uint64_t duration_ms() const { return duration_ns() / 
1'000'000; }
+
 private:
     uint64_t _start_time_ns;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to