This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 980689707 feat(duplication): support resumable download for checkpoint 
files during full duplication on the server side (#2238)
980689707 is described below

commit 9806897071585ce01cf794368c13d366595562c6
Author: Dan Wang <[email protected]>
AuthorDate: Mon May 12 12:11:22 2025 +0800

    feat(duplication): support resumable download for checkpoint files during 
full duplication on the server side (#2238)
    
    https://github.com/apache/incubator-pegasus/issues/2242
    
    When a dup follower requests the latest checkpoint information from the dup
    master, the server supports returning the size and checksum of each file 
under
    the checkpoint directory. This allows the client to decide which files to 
fetch:
    
    - if it is the first time downloading the checkpoint, all files need to be 
downloaded;
    - if it is the second time or later, only files that are missing or 
partially downloaded
    locally need to be fetched.
    
    Additionally, the client is allowed to specify the checksum algorithm to 
use —
    currently, only MD5 is supported.
---
 idl/utils.thrift                                   |  13 +
 .../test/block_service_manager_test.cpp            |  27 +-
 src/common/consensus.thrift                        |  15 ++
 .../bulk_load/test/replica_bulk_loader_test.cpp    |  32 ++-
 src/replica/mutation.h                             |   6 +-
 src/replica/replica.h                              |  38 ++-
 src/replica/replica_chkpt.cpp                      |  82 ++++--
 src/replica/replica_stub.cpp                       |   7 +-
 src/replica/replica_stub.h                         |   7 +
 src/replica/test/replica_test.cpp                  |  29 +--
 src/server/pegasus_server_impl.cpp                 |   5 +-
 src/server/pegasus_server_impl.h                   |   2 +-
 src/server/test/CMakeLists.txt                     |   1 +
 src/server/test/capacity_unit_calculator_test.cpp  |   4 +-
 src/server/test/hotkey_collector_test.cpp          |   4 +-
 src/server/test/manual_compact_service_test.cpp    |   6 +-
 .../test/pegasus_compression_options_test.cpp      |   1 -
 src/server/test/pegasus_server_impl_test.cpp       | 280 ++++++++++++++++++++-
 src/server/test/pegasus_server_test_base.h         |  52 ++--
 src/server/test/pegasus_server_write_test.cpp      |   4 +-
 .../test/pegasus_write_service_impl_test.cpp       |   2 +-
 src/server/test/pegasus_write_service_test.cpp     |   2 +-
 src/server/test/rocksdb_wrapper_test.cpp           |  25 +-
 src/test_util/test_util.cpp                        |  40 ++-
 src/test_util/test_util.h                          |  41 ++-
 src/utils/checksum.cpp                             |  51 ++++
 src/utils/checksum.h                               |  46 ++++
 src/utils/test/CMakeLists.txt                      |   3 +-
 src/utils/test/checksum_test.cpp                   | 106 ++++++++
 src/utils/test_macros.h                            |  17 +-
 30 files changed, 799 insertions(+), 149 deletions(-)

diff --git a/idl/utils.thrift b/idl/utils.thrift
index 659b2c514..f8f5491c3 100644
--- a/idl/utils.thrift
+++ b/idl/utils.thrift
@@ -42,3 +42,16 @@ enum pattern_match_type
     // The string must match the given pattern as a regular expression.
     PMT_MATCH_REGEX,
 }
+
+// Specify which algorithm is used to calculate the checksum of a file.
+enum checksum_type
+{
+    CST_INVALID = 0,
+
+    // Do NOT calculate by any algorithm, which means no content will be
+    // generated.
+    CST_NONE,
+
+    // Use md5sum to calculate.
+    CST_MD5,
+}
diff --git a/src/block_service/test/block_service_manager_test.cpp 
b/src/block_service/test/block_service_manager_test.cpp
index dd1d8dea6..aa8a3fd79 100644
--- a/src/block_service/test/block_service_manager_test.cpp
+++ b/src/block_service/test/block_service_manager_test.cpp
@@ -85,8 +85,11 @@ TEST_P(block_service_manager_test, remote_file_not_exist)
 
 TEST_P(block_service_manager_test, local_file_exist)
 {
-    
NO_FATALS(pegasus::create_local_test_file(utils::filesystem::path_combine(LOCAL_DIR,
 FILE_NAME),
-                                              &_file_meta));
+    std::shared_ptr<pegasus::local_test_file> local_file;
+    NO_FATALS(pegasus::local_test_file::create(
+        utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME), local_file));
+    _file_meta = local_file->get_file_meta();
+
     struct remote_file_info
     {
         int64_t size;
@@ -96,6 +99,7 @@ TEST_P(block_service_manager_test, local_file_exist)
         {2333, _file_meta.md5},      // wrong size
         {_file_meta.size, "bad_md5"} // wrong md5
     };
+
     for (const auto &test : tests) {
         // The remote file will be overwritten when repeatedly created.
         create_remote_file(FILE_NAME, test.size, test.md5);
@@ -107,15 +111,20 @@ TEST_P(block_service_manager_test, local_file_exist)
 
 TEST_P(block_service_manager_test, do_download_succeed)
 {
-    
NO_FATALS(pegasus::create_local_test_file(utils::filesystem::path_combine(LOCAL_DIR,
 FILE_NAME),
-                                              &_file_meta));
+    // Allow local_file to be automatically deleted while out of the scope, to 
mock
+    // the condition where the local file do not exist.
+    {
+        std::shared_ptr<pegasus::local_test_file> local_file;
+        NO_FATALS(pegasus::local_test_file::create(
+            utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME), 
local_file));
+        _file_meta = local_file->get_file_meta();
+    }
+
     create_remote_file(FILE_NAME, _file_meta.size, _file_meta.md5);
-    // remove local file to mock condition that file not existed
-    std::string file_name = utils::filesystem::path_combine(LOCAL_DIR, 
FILE_NAME);
-    utils::filesystem::remove_path(file_name);
+
     uint64_t download_size = 0;
-    ASSERT_EQ(test_download_file(download_size), ERR_OK);
-    ASSERT_EQ(download_size, _file_meta.size);
+    ASSERT_EQ(ERR_OK, test_download_file(download_size));
+    ASSERT_EQ(_file_meta.size, download_size);
 }
 
 } // namespace block_service
diff --git a/src/common/consensus.thrift b/src/common/consensus.thrift
index 8952c090c..4489b730e 100644
--- a/src/common/consensus.thrift
+++ b/src/common/consensus.thrift
@@ -27,6 +27,7 @@
 include "../../idl/dsn.thrift"
 include "../../idl/dsn.layer2.thrift"
 include "../../idl/metadata.thrift"
+include "../../idl/utils.thrift"
 
 namespace cpp dsn.replication
 
@@ -131,6 +132,13 @@ struct learn_state
 
     // Used by duplication. Holds the start_decree of this round of learn.
     5:optional i64   learn_start_decree;
+
+    // file_sizes and file_checksums are only used to implement resumable 
checkpoint download
+    // for duplication. While the dup follower receives file_sizes and 
file_checksums, it will
+    // decide which files it already has and which files it should fetch from 
the dup master
+    // (the dup master will reply to the dup follower with learn_response).
+    6:optional list<i64>        file_sizes;
+    7:optional list<string>     file_checksums;
 }
 
 enum learner_status
@@ -157,6 +165,13 @@ struct learn_request
     // learnee will copy the missing logs.
     7:optional i64               max_gced_decree;
     8:optional dsn.host_port     hp_learner;
+
+    // checksum_type is only used to implement resumable checkpoint download 
for duplication.
+    // It decides which algorithm the dup master will use to calculate the 
checksum for each
+    // file (learn_request will be sent from the dup follower to the dup 
master). Since it is
+    // only used for duplication, by default it is CST_NONE which means do not 
calculate file
+    // size and checksum.
+    9:optional utils.checksum_type      checksum_type = 
utils.checksum_type.CST_NONE;
 }
 
 struct learn_response
diff --git a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp 
b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
index b2e7fde0b..3c3139642 100644
--- a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
+++ b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
@@ -31,6 +31,7 @@
 #include "rpc/rpc_host_port.h"
 #include "task/task_tracker.h"
 #include "test_util/test_util.h"
+#include "utils/defer.h"
 #include "utils/fail_point.h"
 #include "utils/filesystem.h"
 #include "utils/load_dump_object.h"
@@ -245,14 +246,17 @@ public:
         _replica->set_primary_partition_configuration(pc);
     }
 
-    void create_local_metadata_file()
+    void create_local_metadata_file(std::shared_ptr<pegasus::local_test_file> 
&local_file)
     {
-        NO_FATALS(pegasus::create_local_test_file(
-            utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME), 
&_file_meta));
+        NO_FATALS(pegasus::local_test_file::create(
+            utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME), 
local_file));
+        _file_meta = local_file->get_file_meta();
+
         _metadata.files.emplace_back(_file_meta);
         _metadata.file_total_size = _file_meta.size;
-        std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, 
METADATA);
-        ASSERT_EQ(ERR_OK, utils::dump_rjobj_to_file(_metadata, whole_name));
+
+        const auto &full_path = utils::filesystem::path_combine(LOCAL_DIR, 
METADATA);
+        ASSERT_EQ(ERR_OK, utils::dump_rjobj_to_file(_metadata, full_path));
     }
 
     bool validate_metadata()
@@ -520,22 +524,30 @@ TEST_P(replica_bulk_loader_test, 
bulk_load_metadata_corrupt)
 {
     // create file can not parse as bulk_load_metadata structure
     utils::filesystem::create_directory(LOCAL_DIR);
-    
NO_FATALS(pegasus::create_local_test_file(utils::filesystem::path_combine(LOCAL_DIR,
 METADATA),
-                                              &_file_meta));
+    const auto cleanup =
+        dsn::defer([this]() { 
ASSERT_TRUE(utils::filesystem::remove_path(LOCAL_DIR)); });
+
+    std::shared_ptr<pegasus::local_test_file> local_file;
+    
NO_FATALS(pegasus::local_test_file::create(utils::filesystem::path_combine(LOCAL_DIR,
 METADATA),
+                                               local_file));
+    _file_meta = local_file->get_file_meta();
+
     std::string metadata_file_name = 
utils::filesystem::path_combine(LOCAL_DIR, METADATA);
     ASSERT_EQ(ERR_CORRUPTION, 
test_parse_bulk_load_metadata(metadata_file_name));
-    utils::filesystem::remove_path(LOCAL_DIR);
 }
 
 TEST_P(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
 {
     utils::filesystem::create_directory(LOCAL_DIR);
-    NO_FATALS(create_local_metadata_file());
+    const auto cleanup =
+        dsn::defer([this]() { 
ASSERT_TRUE(utils::filesystem::remove_path(LOCAL_DIR)); });
+
+    std::shared_ptr<pegasus::local_test_file> local_file;
+    NO_FATALS(create_local_metadata_file(local_file));
 
     std::string metadata_file_name = 
utils::filesystem::path_combine(LOCAL_DIR, METADATA);
     ASSERT_EQ(ERR_OK, test_parse_bulk_load_metadata(metadata_file_name));
     ASSERT_TRUE(validate_metadata());
-    utils::filesystem::remove_path(LOCAL_DIR);
 }
 
 // finish download test
diff --git a/src/replica/mutation.h b/src/replica/mutation.h
index 62d23d0eb..e88a90dce 100644
--- a/src/replica/mutation.h
+++ b/src/replica/mutation.h
@@ -81,9 +81,6 @@ public:
     mutation();
     ~mutation() override;
 
-    DISALLOW_COPY_AND_ASSIGN(mutation);
-    DISALLOW_MOVE_AND_ASSIGN(mutation);
-
     // copy mutation from an existing mutation, typically used in partition 
split
     // mutation should not reply to client, because parent has already replied
     static mutation_ptr copy_no_reply(const mutation_ptr &old_mu);
@@ -238,6 +235,9 @@ private:
     uint64_t _tid;          // trace id, unique in process
     static std::atomic<uint64_t> s_tid;
     bool _is_sync_to_child; // for partition split
+
+    DISALLOW_COPY_AND_ASSIGN(mutation);
+    DISALLOW_MOVE_AND_ASSIGN(mutation);
 };
 
 class replica;
diff --git a/src/replica/replica.h b/src/replica/replica.h
index b709bc849..50e0105c2 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -34,6 +34,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
 
 #include "common/json_helper.h"
@@ -61,13 +62,12 @@
 #include "utils/thread_access_checker.h"
 #include "utils/throttling_controller.h"
 #include "utils/uniq_timestamp_us.h"
+#include "utils_types.h"
 
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
 class pegasus_server_test_base;
 class rocksdb_wrapper_test;
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
 
 namespace dsn {
 class gpid;
@@ -289,9 +289,23 @@ public:
     // - `callback`: the callback processor handling the error code of 
triggering checkpoint.
     void async_trigger_manual_emergency_checkpoint(decree 
min_checkpoint_decree,
                                                    uint32_t delay_ms,
-                                                   trigger_checkpoint_callback 
callback = {});
+                                                   trigger_checkpoint_callback 
callback);
+
+    // The same as the above except that `callback` is empty.
+    void async_trigger_manual_emergency_checkpoint(decree 
min_checkpoint_decree, uint32_t delay_ms);
+
+    // Get the last checkpoint info and put it into the response to reply to 
the dup follower.
+    //
+    // Parameters:
+    // - `checksum_type`: specify which algorithm the server (namely dup 
master) will use to
+    // calculate the checksum for each file. This parameter is used to 
implement resumable
+    // checkpoint download for duplication: the client (namely dup follower) 
receives the file
+    // sizes and checksums, then decides which files it should fetch from the 
server accordingly.
+    // CST_NONE means do not calculate file size and checksum.
+    // - `response`: the output parameter, used to respond to the dup follower.
+    void on_query_last_checkpoint(utils::checksum_type::type checksum_type,
+                                  learn_response &response);
 
-    void on_query_last_checkpoint(learn_response &response);
     std::shared_ptr<replica_duplicator_manager> get_duplication_manager() const
     {
         return _duplication_mgr;
@@ -703,7 +717,17 @@ private:
     // Currently only used for unit test to get the count of backup requests.
     int64_t get_backup_request_count() const;
 
-private:
+    // Support self-defined `replication_app_base` at runtime which is only 
used for test.
+    template <typename TApp,
+              typename... Args,
+              typename = typename std::enable_if<
+                  std::is_base_of<replication_app_base,
+                                  typename 
std::remove_pointer<TApp>::type>::value>::type>
+    void create_app_for_test(Args &&...args)
+    {
+        _app = std::make_unique<TApp>(std::forward<Args>(args)...);
+    }
+
     friend class ::dsn::replication::test::test_checker;
     friend class ::dsn::replication::mutation_log_tool;
     friend class ::dsn::replication::mutation_queue;
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 6ef6f451c..42275b954 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -25,12 +25,13 @@
  */
 
 #include <fmt/core.h>
-#include <stdint.h>
 #include <atomic>
 #include <chrono>
+#include <cstdint>
 #include <functional>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "common/gpid.h"
@@ -56,13 +57,18 @@
 #include "task/task.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
+#include "utils/checksum.h"
 #include "utils/chrono_literals.h"
+#include "utils/env.h"
 #include "utils/error_code.h"
+#include "utils/errors.h"
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/metrics.h"
+#include "utils/ports.h"
 #include "utils/thread_access_checker.h"
+#include "utils_types.h"
 
 /// The checkpoint of the replicated app part of replica.
 
@@ -245,6 +251,12 @@ void 
replica::async_trigger_manual_emergency_checkpoint(decree min_checkpoint_de
         std::chrono::milliseconds(delay_ms));
 }
 
+void replica::async_trigger_manual_emergency_checkpoint(decree 
min_checkpoint_decree,
+                                                        uint32_t delay_ms)
+{
+    async_trigger_manual_emergency_checkpoint(min_checkpoint_decree, delay_ms, 
{});
+}
+
 // ThreadPool: THREAD_POOL_REPLICATION
 error_code replica::trigger_manual_emergency_checkpoint(decree 
min_checkpoint_decree)
 {
@@ -313,7 +325,8 @@ void replica::init_checkpoint(bool is_emergency)
 }
 
 // ThreadPool: THREAD_POOL_REPLICATION
-void replica::on_query_last_checkpoint(/*out*/ learn_response &response)
+void replica::on_query_last_checkpoint(utils::checksum_type::type 
checksum_type,
+                                       learn_response &response)
 {
     _checker.only_one_thread_access();
 
@@ -322,24 +335,65 @@ void replica::on_query_last_checkpoint(/*out*/ 
learn_response &response)
         return;
     }
 
-    blob placeholder;
-    int err = _app->get_checkpoint(0, placeholder, response.state);
-    if (err != 0) {
+    if (dsn_unlikely(_app->get_checkpoint(0, blob(), response.state) != 
ERR_OK)) {
         response.err = ERR_GET_LEARN_STATE_FAILED;
-    } else {
-        response.err = ERR_OK;
-        response.last_committed_decree = last_committed_decree();
-        // for example: base_local_dir = "./data" + "checkpoint.1024" = 
"./data/checkpoint.1024"
-        response.base_local_dir = utils::filesystem::path_combine(
-            _app->data_dir(), 
checkpoint_folder(response.state.to_decree_included));
-        SET_IP_AND_HOST_PORT(
-            response, learnee, _stub->primary_address(), 
_stub->primary_host_port());
+        return;
+    }
+
+    response.err = ERR_OK;
+    response.last_committed_decree = last_committed_decree();
+
+    // For example: base_local_dir = "./data" + "checkpoint.1024" = 
"./data/checkpoint.1024"
+    response.base_local_dir = utils::filesystem::path_combine(
+        _app->data_dir(), 
checkpoint_folder(response.state.to_decree_included));
+
+    SET_IP_AND_HOST_PORT(response, learnee, _stub->primary_address(), 
_stub->primary_host_port());
+
+    // If the client does not require the calculation of the checksum, only 
respond with name
+    // for each file.
+    if (checksum_type <= utils::checksum_type::CST_NONE) {
         for (auto &file : response.state.files) {
-            // response.state.files contain file absolute path, for example:
+            // response.state.files contain file absolute path, for example:
             // "./data/checkpoint.1024/1.sst" use `substr` to get the file 
name: 1.sst
             file = file.substr(response.base_local_dir.length() + 1);
         }
+
+        return;
+    }
+
+    std::vector<int64_t> file_sizes;
+    std::vector<std::string> file_checksums;
+    file_sizes.reserve(response.state.files.size());
+    file_checksums.reserve(response.state.files.size());
+    for (auto &file : response.state.files) {
+        int64_t size = 0;
+        if (dsn_unlikely(
+                !utils::filesystem::file_size(file, 
utils::FileDataType::kSensitive, size))) {
+            LOG_ERROR_PREFIX("get size of file failed: file = {}", file);
+            response.err = ERR_GET_LEARN_STATE_FAILED;
+            return;
+        }
+
+        std::string checksum;
+        const auto result = calc_checksum(file, checksum_type, checksum);
+        if (dsn_unlikely(!result)) {
+            LOG_ERROR_PREFIX("calculate checksum failed: file = {}, err = {}", 
file, result);
+            response.err = ERR_GET_LEARN_STATE_FAILED;
+            return;
+        }
+
+        file_sizes.push_back(size);
+        file_checksums.push_back(std::move(checksum));
+
+        // response.state.files contain file absolute path, for example:
+        // "./data/checkpoint.1024/1.sst" use `substr` to get the file name: 
1.sst
+        file = file.substr(response.base_local_dir.length() + 1);
     }
+
+    response.state.__isset.file_sizes = true;
+    response.state.__isset.file_checksums = true;
+    response.state.file_sizes = std::move(file_sizes);
+    response.state.file_checksums = std::move(file_checksums);
 }
 
 // run in background thread
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 6b1f06031..0003d1e9e 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1211,11 +1211,12 @@ void 
replica_stub::on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc)
     learn_response &response = rpc.response();
 
     replica_ptr rep = get_replica(request.pid);
-    if (rep != nullptr) {
-        rep->on_query_last_checkpoint(response);
-    } else {
+    if (dsn_unlikely(rep == nullptr)) {
         response.err = ERR_OBJECT_NOT_FOUND;
+        return;
     }
+
+    rep->on_query_last_checkpoint(request.checksum_type, response);
 }
 
 // ThreadPool: THREAD_POOL_DEFAULT
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 7d03d06ee..8d67dc044 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -71,6 +71,12 @@
 #include "utils/metrics.h"
 #include "utils/zlocks.h"
 
+namespace pegasus::server {
+
+class pegasus_server_test_base;
+
+} // namespace pegasus::server
+
 namespace dsn::utils {
 
 class ex_lock;
@@ -494,6 +500,7 @@ private:
     friend class replica_disk_test_base;
     friend class replica_disk_migrate_test;
     friend class replica_stub_test_base;
+    friend class pegasus::server::pegasus_server_test_base;
     friend class open_replica_test;
     friend class replica_follower;
     friend class replica_follower_test;
diff --git a/src/replica/test/replica_test.cpp 
b/src/replica/test/replica_test.cpp
index d1fd9ab04..2c9bbb1bd 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <stddef.h>
-#include <stdint.h>
 #include <atomic>
+#include <cstddef>
+#include <cstdint>
 #include <functional>
 #include <iostream>
 #include <map>
@@ -37,9 +37,7 @@
 #include "common/replication_common.h"
 #include "common/replication_enums.h"
 #include "common/replication_other_types.h"
-#include "consensus_types.h"
 #include "dsn.layer2_types.h"
-#include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "http/http_server.h"
 #include "http/http_status_code.h"
@@ -515,29 +513,6 @@ TEST_P(replica_test, 
test_trigger_manual_emergency_checkpoint)
     _mock_replica->tracker()->wait_outstanding_tasks();
 }
 
-TEST_P(replica_test, test_query_last_checkpoint_info)
-{
-    // test no exist gpid
-    auto req = std::make_unique<learn_request>();
-    req->pid = gpid(100, 100);
-    query_last_checkpoint_info_rpc rpc =
-        query_last_checkpoint_info_rpc(std::move(req), 
RPC_QUERY_LAST_CHECKPOINT_INFO);
-    stub->on_query_last_checkpoint(rpc);
-    ASSERT_EQ(rpc.response().err, ERR_OBJECT_NOT_FOUND);
-
-    learn_response resp;
-    // last_checkpoint hasn't exist
-    _mock_replica->on_query_last_checkpoint(resp);
-    ASSERT_EQ(resp.err, ERR_PATH_NOT_FOUND);
-
-    // query ok
-    _mock_replica->update_last_durable_decree(100);
-    _mock_replica->set_last_committed_decree(200);
-    _mock_replica->on_query_last_checkpoint(resp);
-    ASSERT_EQ(resp.last_committed_decree, 200);
-    ASSERT_STR_CONTAINS(resp.base_local_dir, "/data/checkpoint.100");
-}
-
 TEST_P(replica_test, test_clear_on_failure)
 {
     // Clear up the remaining state.
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index e745f98fb..de9abc7b0 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -2232,13 +2232,14 @@ private:
 {
     CHECK(_is_open, "");
 
-    int64_t ci = last_durable_decree();
+    const int64_t ci = last_durable_decree();
     if (ci == 0) {
         LOG_ERROR_PREFIX("no checkpoint found");
         return ::dsn::ERR_OBJECT_NOT_FOUND;
     }
 
-    auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(), 
chkpt_get_dir_name(ci));
+    const auto chkpt_dir =
+        ::dsn::utils::filesystem::path_combine(data_dir(), 
chkpt_get_dir_name(ci));
     state.files.clear();
     if (!::dsn::utils::filesystem::get_subfiles(chkpt_dir, state.files, true)) 
{
         LOG_ERROR_PREFIX("list files in checkpoint dir {} failed", chkpt_dir);
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index deeb75e4d..b385137e0 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -249,6 +249,7 @@ public:
 private:
     friend class manual_compact_service_test;
     friend class pegasus_compression_options_test;
+    friend class pegasus_server_test_base;
     friend class pegasus_server_impl_test;
     friend class hotkey_collector_test;
     FRIEND_TEST(pegasus_server_impl_test, default_data_version);
@@ -478,7 +479,6 @@ private:
     void
     log_expired_data(const char *op, const dsn::rpc_address &addr, const 
rocksdb::Slice &key) const;
 
-private:
     static const std::chrono::seconds kServerStatUpdateTimeSec;
     static const std::string COMPRESSION_HEADER;
 
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index 082a98c1e..a3f647092 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -43,6 +43,7 @@ set(MY_PROJ_LIBS
         dsn.failure_detector
         dsn.replication.zookeeper_provider
         dsn_utils
+        test_utils
         rocksdb
         lz4
         zstd
diff --git a/src/server/test/capacity_unit_calculator_test.cpp 
b/src/server/test/capacity_unit_calculator_test.cpp
index 87309e675..c0486d418 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -104,9 +104,9 @@ protected:
 public:
     dsn::blob key, hash_key;
 
-    capacity_unit_calculator_test() : pegasus_server_test_base()
+    capacity_unit_calculator_test()
     {
-        _cal = std::make_unique<mock_capacity_unit_calculator>(_server.get());
+        _cal = std::make_unique<mock_capacity_unit_calculator>(_server);
         pegasus_generate_key(key, dsn::blob::create_from_bytes("h"), 
dsn::blob());
         hash_key = dsn::blob::create_from_bytes("key");
     }
diff --git a/src/server/test/hotkey_collector_test.cpp 
b/src/server/test/hotkey_collector_test.cpp
index 688efcb02..1c93e0910 100644
--- a/src/server/test/hotkey_collector_test.cpp
+++ b/src/server/test/hotkey_collector_test.cpp
@@ -107,7 +107,7 @@ TEST(hotkey_collector_public_func_test, 
find_outlier_index_test)
 class coarse_collector_test : public pegasus_server_test_base
 {
 public:
-    coarse_collector_test() : coarse_collector(_server.get(), 
FLAGS_hotkey_buckets_num){};
+    coarse_collector_test() : coarse_collector(_server, 
FLAGS_hotkey_buckets_num){};
 
     hotkey_coarse_data_collector coarse_collector;
 
@@ -164,7 +164,7 @@ public:
     int max_queue_size = 1000;
     int target_bucket_index = 0;
     hotkey_fine_data_collector fine_collector;
-    fine_collector_test() : fine_collector(_server.get(), 1, max_queue_size)
+    fine_collector_test() : fine_collector(_server, 1, max_queue_size)
     {
         fine_collector.change_target_bucket(0);
     };
diff --git a/src/server/test/manual_compact_service_test.cpp 
b/src/server/test/manual_compact_service_test.cpp
index 8ccd761b5..95638bfb3 100644
--- a/src/server/test/manual_compact_service_test.cpp
+++ b/src/server/test/manual_compact_service_test.cpp
@@ -48,16 +48,16 @@ public:
     manual_compact_service_test()
     {
         start();
-        manual_compact_svc = 
std::make_unique<pegasus_manual_compact_service>(_server.get());
+        manual_compact_svc = 
std::make_unique<pegasus_manual_compact_service>(_server);
     }
 
-    void set_compact_time(int64_t ts)
+    void set_compact_time(int64_t ts) const
     {
         manual_compact_svc->_manual_compact_last_finish_time_ms.store(
             static_cast<uint64_t>(ts * 1000));
     }
 
-    void set_mock_now(uint64_t mock_now_sec)
+    void set_mock_now(uint64_t mock_now_sec) const
     {
         manual_compact_svc->_mock_now_timestamp = mock_now_sec * 1000;
     }
diff --git a/src/server/test/pegasus_compression_options_test.cpp 
b/src/server/test/pegasus_compression_options_test.cpp
index 4836bced1..4dacb80f7 100644
--- a/src/server/test/pegasus_compression_options_test.cpp
+++ b/src/server/test/pegasus_compression_options_test.cpp
@@ -21,7 +21,6 @@
 #include <rocksdb/db.h>
 #include <rocksdb/options.h>
 #include <map>
-#include <memory>
 #include <string>
 #include <vector>
 
diff --git a/src/server/test/pegasus_server_impl_test.cpp 
b/src/server/test/pegasus_server_impl_test.cpp
index f2a491575..9245a8cc6 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -17,38 +17,53 @@
  * under the License.
  */
 
+// IWYU pragma: no_include <ext/alloc_traits.h>
 #include <base/pegasus_key_schema.h>
 #include <fmt/core.h>
 #include <rocksdb/db.h>
 #include <rocksdb/options.h>
-#include <stdint.h>
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <initializer_list>
 #include <map>
 #include <memory>
 #include <set>
 #include <string>
 #include <utility>
+#include <vector>
 
+#include "common/gpid.h"
 #include "common/replica_envs.h"
+#include "common/replication.codes.h"
+#include "common/replication_other_types.h"
+#include "consensus_types.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "pegasus_server_test_base.h"
+#include "replica/replica_stub.h"
+#include "rpc/rpc_holder.h"
 #include "rrdb/rrdb.code.definition.h"
 #include "rrdb/rrdb_types.h"
 #include "runtime/serverlet.h"
 #include "server/pegasus_read_service.h"
+#include "test_util/test_util.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
+#include "utils/defer.h"
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
+#include "utils/fmt_logging.h"
 #include "utils/metrics.h"
+#include "utils/test_macros.h"
+#include "utils_types.h"
 
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
 
 class pegasus_server_impl_test : public pegasus_server_test_base
 {
-public:
-    pegasus_server_impl_test() : pegasus_server_test_base() {}
+protected:
+    pegasus_server_impl_test() = default;
 
     void test_table_level_slow_query()
     {
@@ -140,6 +155,194 @@ public:
             }
         }
     }
+
+    // Test last checkpoint query in `pegasus_server_impl_test` rather than in 
`replica_test`,
+    // because `pegasus_server_impl::get_checkpoint()` needs to be tested 
instead of being
+    // mocked.
+    void test_query_last_checkpoint(bool create_checkpoint_dir,
+                                    const dsn::gpid &pid,
+                                    dsn::utils::checksum_type::type 
checksum_type,
+                                    const dsn::error_code &expected_err,
+                                    dsn::replication::decree 
expected_last_committed_decree,
+                                    dsn::replication::decree 
expected_last_durable_decree,
+                                    const std::vector<std::string> 
&expected_file_names,
+                                    const std::vector<int64_t> 
&expected_file_sizes,
+                                    const std::vector<std::string> 
&expected_file_checksums)
+    {
+        const std::string checkpoint_dir(dsn::utils::filesystem::path_combine(
+            _server->data_dir(), fmt::format("checkpoint.{}", 
expected_last_durable_decree)));
+
+        // After the test case is finished, remove the whole checkpoint dir.
+        const auto cleanup = dsn::defer([checkpoint_dir, 
expected_file_names]() {
+            if (!dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
+                return;
+            }
+
+            ASSERT_TRUE(dsn::utils::filesystem::remove_path(checkpoint_dir));
+        });
+
+        std::map<std::string, std::shared_ptr<local_test_file>> local_files;
+        if (create_checkpoint_dir) {
+            
ASSERT_TRUE(dsn::utils::filesystem::create_directory(checkpoint_dir));
+
+            // Generate all files in the checkpoint dir with respective 
specified size.
+            for (size_t i = 0; i < expected_file_names.size(); ++i) {
+                const auto file_path =
+                    dsn::utils::filesystem::path_combine(checkpoint_dir, 
expected_file_names[i]);
+
+                std::shared_ptr<local_test_file> local_file;
+                NO_FATALS(local_test_file::create(
+                    file_path, std::string(expected_file_sizes[i], 'a'), 
local_file));
+                ASSERT_TRUE(local_files.emplace(file_path, local_file).second);
+            }
+        }
+
+        // Build the request to get the last checkpoint info.
+        auto req = std::make_unique<dsn::replication::learn_request>();
+        req->pid = pid;
+        req->__set_checksum_type(checksum_type);
+
+        // Mock the RPC.
+        auto rpc = 
dsn::replication::query_last_checkpoint_info_rpc(std::move(req),
+                                                                    
RPC_QUERY_LAST_CHECKPOINT_INFO);
+
+        // Execute the last checkpoint query on server side.
+        _replica_stub->on_query_last_checkpoint(rpc);
+
+        // The error code in the response should be match the expected.
+        const auto &resp = rpc.response();
+        ASSERT_EQ(expected_err, resp.err);
+
+        // No need to check others in the response once the error code is not 
ERR_OK.
+        if (expected_err != dsn::ERR_OK) {
+            return;
+        }
+
+        ASSERT_EQ(expected_last_committed_decree, resp.last_committed_decree);
+        ASSERT_STR_ENDSWITH(resp.base_local_dir,
+                            fmt::format("/data/checkpoint.{}", 
expected_last_durable_decree));
+
+        // Check whether all file names in the response are matched.
+        check_checkpoint_file_names(resp.state.files, expected_file_names);
+
+        // The file sizes and checksums shoule be kept empty if checksum is 
not required.
+        if (checksum_type <= dsn::utils::checksum_type::CST_NONE) {
+            ASSERT_FALSE(resp.state.__isset.file_sizes);
+            ASSERT_FALSE(resp.state.__isset.file_checksums);
+            ASSERT_TRUE(resp.state.file_sizes.empty());
+            ASSERT_TRUE(resp.state.file_checksums.empty());
+            return;
+        }
+
+        // Check whether all file sizes in the response are matched.
+        ASSERT_TRUE(resp.state.__isset.file_sizes);
+        check_checkpoint_file_sizes(resp.state.file_sizes, resp.state.files, 
expected_file_sizes);
+
+        // Check whether all file checksums in the response are matched.
+        ASSERT_TRUE(resp.state.__isset.file_checksums);
+        check_checkpoint_file_checksums(
+            resp.state.file_checksums, resp.state.files, 
expected_file_checksums);
+    }
+
+    // Only test for failed cases.
+    void test_query_last_checkpoint(bool create_checkpoint_dir,
+                                    const dsn::gpid &pid,
+                                    const dsn::error_code &expected_err)
+    {
+        test_query_last_checkpoint(create_checkpoint_dir,
+                                   pid,
+                                   dsn::utils::checksum_type::CST_INVALID,
+                                   expected_err,
+                                   0,
+                                   0,
+                                   {},
+                                   {},
+                                   {});
+    }
+
+    // Only test for failed cases.
+    void test_query_last_checkpoint(const dsn::gpid &pid, const 
dsn::error_code &expected_err)
+    {
+        test_query_last_checkpoint(true, pid, expected_err);
+    }
+
+    // Test for each checksum type.
+    void test_query_last_checkpoint_for_all_checksum_types(
+        dsn::replication::decree expected_last_committed_decree,
+        dsn::replication::decree expected_last_durable_decree,
+        const std::vector<std::string> &expected_file_names,
+        const std::vector<int64_t> &expected_file_sizes,
+        const std::vector<std::string> &expected_file_checksums)
+    {
+        for (const auto checksum_type : 
{dsn::utils::checksum_type::CST_INVALID,
+                                         dsn::utils::checksum_type::CST_NONE,
+                                         dsn::utils::checksum_type::CST_MD5}) {
+            test_query_last_checkpoint(true,
+                                       _gpid,
+                                       checksum_type,
+                                       dsn::ERR_OK,
+                                       200,
+                                       100,
+                                       expected_file_names,
+                                       expected_file_sizes,
+                                       expected_file_checksums);
+        }
+    }
+
+private:
+    static void check_checkpoint_file_names(const std::vector<std::string> 
&file_names,
+                                            const std::vector<std::string> 
&expected_file_names)
+    {
+        std::vector<std::string> ordered_file_names(file_names);
+        std::sort(ordered_file_names.begin(), ordered_file_names.end());
+        ASSERT_EQ(expected_file_names, ordered_file_names);
+    }
+
+    // Sort `file_properties` so that its order is consistent with that of 
`file_names`.
+    template <typename TFileProperty>
+    static std::vector<TFileProperty>
+    sort_checkpoint_file_properties(const std::vector<TFileProperty> 
&file_properties,
+                                    const std::vector<std::string> &file_names)
+    {
+        CHECK_EQ(file_properties.size(), file_names.size());
+
+        std::vector<size_t> order_indices(file_names.size());
+        for (size_t i = 0; i < order_indices.size(); ++i) {
+            order_indices[i] = i;
+        }
+
+        std::sort(order_indices.begin(), order_indices.end(), 
[&file_names](size_t a, size_t b) {
+            return file_names[a] < file_names[b];
+        });
+
+        std::vector<TFileProperty> ordered_file_properties;
+        ordered_file_properties.reserve(file_properties.size());
+        for (size_t idx : order_indices) {
+            ordered_file_properties.push_back(file_properties[idx]);
+        }
+
+        return ordered_file_properties;
+    }
+
+    static void check_checkpoint_file_sizes(const std::vector<int64_t> 
&file_sizes,
+                                            const std::vector<std::string> 
&file_names,
+                                            const std::vector<int64_t> 
&expected_file_sizes)
+    {
+        const auto ordered_file_sizes = 
sort_checkpoint_file_properties(file_sizes, file_names);
+
+        ASSERT_EQ(expected_file_sizes, ordered_file_sizes);
+    }
+
+    static void
+    check_checkpoint_file_checksums(const std::vector<std::string> 
&file_checksums,
+                                    const std::vector<std::string> &file_names,
+                                    const std::vector<std::string> 
&expected_file_checksums)
+    {
+        const auto ordered_file_checksums =
+            sort_checkpoint_file_properties(file_checksums, file_names);
+
+        ASSERT_EQ(expected_file_checksums, ordered_file_checksums);
+    }
 };
 
 INSTANTIATE_TEST_SUITE_P(, pegasus_server_impl_test, ::testing::Values(false, 
true));
@@ -251,5 +454,68 @@ TEST_P(pegasus_server_impl_test, 
test_load_from_duplication_data)
     dsn::utils::filesystem::remove_file_name(new_file);
 }
 
-} // namespace server
-} // namespace pegasus
+// Fail to get last checkpoint since the replica does not exist.
+TEST_P(pegasus_server_impl_test, 
test_query_last_checkpoint_with_replica_not_found)
+{
+    // To make sure the replica does not exist, give a gpid that does not 
exist.
+    test_query_last_checkpoint(dsn::gpid(101, 101), dsn::ERR_OBJECT_NOT_FOUND);
+}
+
+// Fail to get last checkpoint since it does not exist.
+TEST_P(pegasus_server_impl_test, 
test_query_last_checkpoint_with_last_checkpoint_not_exist)
+{
+    // To make sure the last checkpoint does not exist, set 
last_durable_decree zero.
+    set_last_committed_decree(0);
+    set_last_durable_decree(0);
+
+    test_query_last_checkpoint(_gpid, dsn::ERR_PATH_NOT_FOUND);
+}
+
+// Fail to get last checkpoint since its dir does not exist.
+TEST_P(pegasus_server_impl_test, 
test_query_last_checkpoint_with_last_checkpoint_dir_not_exist)
+{
+    ASSERT_EQ(dsn::ERR_OK, start());
+
+    // Make sure the last_durable_decree is not zero.
+    set_last_committed_decree(200);
+    set_last_durable_decree(100);
+
+    test_query_last_checkpoint(false, _gpid, dsn::ERR_GET_LEARN_STATE_FAILED);
+}
+
+// Succeed in getting last checkpoint whose dir is empty and has no file.
+TEST_P(pegasus_server_impl_test, test_query_last_checkpoint_with_empty_dir)
+{
+    ASSERT_EQ(dsn::ERR_OK, start());
+
+    // Make sure the last_durable_decree is not zero.
+    set_last_committed_decree(200);
+    set_last_durable_decree(100);
+
+    test_query_last_checkpoint_for_all_checksum_types(200, 100, {}, {}, {});
+}
+
+// Succeed in getting last checkpoint whose dir is not empty and has some 
files.
+TEST_P(pegasus_server_impl_test, test_query_last_checkpoint_with_non_empty_dir)
+{
+    static const std::vector<std::string> kCheckpointFileNames = {
+        "test_file.1", "test_file.2", "test_file.3", "test_file.4", 
"test_file.5", "test_file.6"};
+    static const std::vector<int64_t> kCheckpointFileSizes = {0, 1, 2, 4095, 
4096, 4097};
+    static const std::vector<std::string> kCheckpointFileChecksums = {
+        "d41d8cd98f00b204e9800998ecf8427e",
+        "0cc175b9c0f1b6a831c399e269772661",
+        "4124bc0a9335c27f086f24ba207a4912",
+        "559110baa849c7608ee70abe1d76273e",
+        "21a199c53f422a380e20b162fb6ebe9c",
+        "8cfc1a0bd8cd76599e76e5e721c6e62e"};
+
+    ASSERT_EQ(dsn::ERR_OK, start());
+
+    set_last_committed_decree(200);
+    set_last_durable_decree(100);
+
+    test_query_last_checkpoint_for_all_checksum_types(
+        200, 100, kCheckpointFileNames, kCheckpointFileSizes, 
kCheckpointFileChecksums);
+}
+
+} // namespace pegasus::server
diff --git a/src/server/test/pegasus_server_test_base.h 
b/src/server/test/pegasus_server_test_base.h
index 00a46cf75..2ea7fdc25 100644
--- a/src/server/test/pegasus_server_test_base.h
+++ b/src/server/test/pegasus_server_test_base.h
@@ -24,10 +24,11 @@
 #include <gtest/gtest.h>
 #include <gmock/gmock.h>
 #include "common/fs_manager.h"
-#include "utils/flags.h"
 #include "replica/replica_stub.h"
 #include "test_util/test_util.h"
+#include "utils/casts.h"
 #include "utils/filesystem.h"
+#include "utils/flags.h"
 
 DSN_DECLARE_bool(encrypt_data_at_rest);
 
@@ -50,7 +51,7 @@ public:
     {
         // Remove rdb to prevent rocksdb recovery from last test.
         dsn::utils::filesystem::remove_path("./test_dir");
-        _replica_stub = new dsn::replication::replica_stub();
+        _replica_stub = std::make_unique<dsn::replication::replica_stub>();
         _replica_stub->get_fs_manager()->initialize({"test_dir"}, 
{"test_tag"});
 
         // Use different gpid for encryption and non-encryption test to avoid 
reopening a rocksdb
@@ -64,19 +65,29 @@ public:
         dsn::app_info app_info;
         app_info.app_type = "pegasus";
 
+        initialize_replica(app_info);
+
+        CHECK(dsn::utils::filesystem::create_directory(_server->data_dir()),
+              "create data dir {} failed",
+              _server->data_dir());
+    }
+
+    void initialize_replica(const dsn::app_info &app_info)
+    {
+        _replica.reset();
+
         auto *dn = 
_replica_stub->get_fs_manager()->find_best_dir_for_new_replica(_gpid);
         CHECK_NOTNULL(dn, "");
-        _replica = new dsn::replication::replica(_replica_stub, _gpid, 
app_info, dn, false, false);
-        const auto dir_data = dsn::utils::filesystem::path_combine(
-            _replica->dir(), dsn::replication::replication_app_base::kDataDir);
-        CHECK(dsn::utils::filesystem::create_directory(dir_data),
-              "create data dir {} failed",
-              dir_data);
 
-        _server = std::make_unique<mock_pegasus_server_impl>(_replica);
+        _replica =
+            new dsn::replication::replica(_replica_stub.get(), _gpid, 
app_info, dn, false, false);
+        _replica_stub->_replicas[_gpid] = _replica;
+
+        
_replica->create_app_for_test<mock_pegasus_server_impl>(_replica.get());
+        _server = dsn::down_cast<mock_pegasus_server_impl 
*>(_replica->_app.get());
     }
 
-    dsn::error_code start(const std::map<std::string, std::string> &envs = {})
+    dsn::error_code start(const std::map<std::string, std::string> &envs)
     {
         std::unique_ptr<char *[]> argvs = std::make_unique<char *[]>(1 + 
envs.size() * 2);
         char **argv = argvs.get();
@@ -91,20 +102,29 @@ public:
         return _server->start(idx, argv);
     }
 
+    dsn::error_code start() { return start({}); }
+
+    void set_last_committed_decree(dsn::replication::decree d)
+    {
+        _replica->_prepare_list->reset(d);
+    }
+
+    void set_last_durable_decree(dsn::replication::decree d)
+    {
+        _server->set_last_durable_decree(d);
+    }
+
     ~pegasus_server_test_base() override
     {
         // do not clear state
         _server->stop(false);
-
-        delete _replica_stub;
-        delete _replica;
     }
 
 protected:
-    std::unique_ptr<mock_pegasus_server_impl> _server;
-    dsn::replication::replica *_replica = nullptr;
-    dsn::replication::replica_stub *_replica_stub = nullptr;
+    std::unique_ptr<dsn::replication::replica_stub> _replica_stub;
     dsn::gpid _gpid;
+    dsn::replication::replica_ptr _replica;
+    mock_pegasus_server_impl *_server;
 };
 
 } // namespace server
diff --git a/src/server/test/pegasus_server_write_test.cpp 
b/src/server/test/pegasus_server_write_test.cpp
index 4b7c6fc84..f9ab7b414 100644
--- a/src/server/test/pegasus_server_write_test.cpp
+++ b/src/server/test/pegasus_server_write_test.cpp
@@ -52,10 +52,10 @@ class pegasus_server_write_test : public 
pegasus_server_test_base
     std::unique_ptr<pegasus_server_write> _server_write;
 
 public:
-    pegasus_server_write_test() : pegasus_server_test_base()
+    pegasus_server_write_test()
     {
         start();
-        _server_write = std::make_unique<pegasus_server_write>(_server.get());
+        _server_write = std::make_unique<pegasus_server_write>(_server);
     }
 
     void test_batch_writes()
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp 
b/src/server/test/pegasus_write_service_impl_test.cpp
index 053dd6333..192c3110e 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -62,7 +62,7 @@ protected:
     void SetUp() override
     {
         ASSERT_EQ(dsn::ERR_OK, start());
-        _server_write = std::make_unique<pegasus_server_write>(_server.get());
+        _server_write = std::make_unique<pegasus_server_write>(_server);
         _write_impl = _server_write->_write_svc->_impl.get();
         _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
     }
diff --git a/src/server/test/pegasus_write_service_test.cpp 
b/src/server/test/pegasus_write_service_test.cpp
index e5c5b4382..b4db69849 100644
--- a/src/server/test/pegasus_write_service_test.cpp
+++ b/src/server/test/pegasus_write_service_test.cpp
@@ -59,7 +59,7 @@ public:
     void SetUp() override
     {
         start();
-        _server_write = std::make_unique<pegasus_server_write>(_server.get());
+        _server_write = std::make_unique<pegasus_server_write>(_server);
         _write_svc = _server_write->_write_svc.get();
     }
 
diff --git a/src/server/test/rocksdb_wrapper_test.cpp 
b/src/server/test/rocksdb_wrapper_test.cpp
index 7831afc71..bfd8e11e2 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -18,31 +18,28 @@
  */
 
 #include <fmt/core.h>
-#include <stdint.h>
+#include <algorithm>
+#include <cstdint>
 #include <memory>
 #include <string>
+#include <string_view>
 #include <utility>
 
-#include "common/fs_manager.h"
 #include "dsn.layer2_types.h"
 #include "gtest/gtest.h"
 #include "pegasus_key_schema.h"
 #include "pegasus_server_test_base.h"
 #include "pegasus_utils.h"
 #include "pegasus_value_schema.h"
-#include "replica/replica.h"
-#include "replica/replica_stub.h"
 #include "server/pegasus_server_write.h"
 #include "server/pegasus_write_service.h"
 #include "server/pegasus_write_service_impl.h"
 #include "server/rocksdb_wrapper.h"
 #include "utils/blob.h"
 #include "utils/error_code.h"
-#include "utils/fmt_logging.h"
-#include <string_view>
 
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
+
 class rocksdb_wrapper_test : public pegasus_server_test_base
 {
 protected:
@@ -56,7 +53,7 @@ public:
     void SetUp() override
     {
         ASSERT_EQ(::dsn::ERR_OK, start());
-        _server_write = std::make_unique<pegasus_server_write>(_server.get());
+        _server_write = std::make_unique<pegasus_server_write>(_server);
         _rocksdb_wrapper = 
_server_write->_write_svc->_impl->_rocksdb_wrapper.get();
 
         pegasus::pegasus_generate_key(
@@ -79,16 +76,12 @@ public:
     void set_app_duplicating()
     {
         _server->stop(false);
-        delete _replica;
 
         dsn::app_info app_info;
         app_info.app_type = "pegasus";
         app_info.duplicating = true;
 
-        auto *dn = 
_replica_stub->get_fs_manager()->find_best_dir_for_new_replica(_gpid);
-        CHECK_NOTNULL(dn, "");
-        _replica = new dsn::replication::replica(_replica_stub, _gpid, 
app_info, dn, false, false);
-        _server = std::make_unique<mock_pegasus_server_impl>(_replica);
+        initialize_replica(app_info);
 
         SetUp();
     }
@@ -232,5 +225,5 @@ TEST_P(rocksdb_wrapper_test, 
verify_timetag_compatible_with_version_0)
         _rocksdb_wrapper->_pegasus_data_version, std::move(get_ctx.raw_value), 
user_value);
     ASSERT_EQ(user_value.to_string(), value);
 }
-} // namespace server
-} // namespace pegasus
+
+} // namespace pegasus::server
diff --git a/src/test_util/test_util.cpp b/src/test_util/test_util.cpp
index 338ec4ddf..32e354654 100644
--- a/src/test_util/test_util.cpp
+++ b/src/test_util/test_util.cpp
@@ -22,7 +22,6 @@
 #include <thread>
 
 #include "gtest/gtest.h"
-#include "metadata_types.h"
 #include "rocksdb/env.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/status.h"
@@ -35,19 +34,38 @@
 
 namespace pegasus {
 
-void create_local_test_file(const std::string &full_name, 
dsn::replication::file_meta *fm)
+void local_test_file::create(const std::string &path,
+                             const std::string &content,
+                             std::shared_ptr<local_test_file> &file)
 {
-    ASSERT_NE(fm, nullptr);
-    auto s =
+    const auto status =
         
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
-                                   rocksdb::Slice("write some data."),
-                                   full_name,
+                                   rocksdb::Slice(content),
+                                   path,
                                    /* should_sync */ true);
-    ASSERT_TRUE(s.ok()) << s.ToString();
-    fm->name = full_name;
-    ASSERT_EQ(dsn::ERR_OK, dsn::utils::filesystem::md5sum(full_name, fm->md5));
-    ASSERT_TRUE(dsn::utils::filesystem::file_size(
-        full_name, dsn::utils::FileDataType::kSensitive, fm->size));
+    ASSERT_TRUE(status.ok()) << status.ToString();
+
+    dsn::replication::file_meta meta;
+    meta.name = path;
+    ASSERT_TRUE(
+        dsn::utils::filesystem::file_size(path, 
dsn::utils::FileDataType::kSensitive, meta.size));
+    ASSERT_EQ(dsn::ERR_OK, dsn::utils::filesystem::md5sum(path, meta.md5));
+
+    file = std::shared_ptr<local_test_file>(new local_test_file(meta), 
deleter);
+}
+
+void local_test_file::create(const std::string &path, 
std::shared_ptr<local_test_file> &file)
+{
+    return create(path, "write some data.", file);
+}
+
+local_test_file::local_test_file(const dsn::replication::file_meta &meta) : 
_file_meta(meta) {}
+
+local_test_file::~local_test_file()
+{
+    // We don't check whether returning ture, since the dir where the file is 
located may have
+    // been removed.
+    dsn::utils::filesystem::remove_path(_file_meta.name);
 }
 
 void AssertEventually(const std::function<void(void)> &f, int timeout_sec, 
WaitBackoff backoff)
diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h
index 549d8fd94..74badb880 100644
--- a/src/test_util/test_util.h
+++ b/src/test_util/test_util.h
@@ -25,28 +25,26 @@
 #include <cstdint>
 #include <cstdio>
 #include <functional>
+#include <memory>
 #include <string>
 
+#include "metadata_types.h"
 #include "runtime/api_layer1.h"
-#include "utils/env.h"
 // IWYU refused to include "utils/defer.h" everywhere, both in .h and .cpp 
files.
 // However, once "utils/defer.h" is not included, it is inevitable that 
compilation
 // will fail since dsn::defer is referenced. Thus force IWYU to keep it.
 #include "utils/defer.h" // IWYU pragma: keep
+#include "utils/env.h"
 #include "utils/flags.h"
+#include "utils/ports.h"
 #include "utils/test_macros.h"
 
 DSN_DECLARE_bool(encrypt_data_at_rest);
 
-namespace dsn {
-namespace replication {
-class file_meta;
-} // namespace replication
-} // namespace dsn
-
 #define PRESERVE_VAR(name, expr)                                               
                    \
     const auto PRESERVED_##name = expr;                                        
                    \
-    auto PRESERVED_##name##_cleanup = dsn::defer([PRESERVED_##name]() { expr = 
PRESERVED_##name; })
+    const auto PRESERVED_##name##_cleanup =                                    
                    \
+        dsn::defer([PRESERVED_##name]() { expr = PRESERVED_##name; })
 
 // Save the current value of a flag and restore it at the end of the function.
 #define PRESERVE_FLAG(name) PRESERVE_VAR(FLAGS_##name, FLAGS_##name)
@@ -90,7 +88,32 @@ private:
     uint64_t _start_ms = 0;
 };
 
-void create_local_test_file(const std::string &full_name, 
dsn::replication::file_meta *fm);
+// Used to generate a local file for test whose life cycle is managed with 
RAII: the file
+// will be removed automatically in destructor.
+class local_test_file
+{
+public:
+    // Generate a file whose content is user-defined.
+    static void create(const std::string &path,
+                       const std::string &content,
+                       std::shared_ptr<local_test_file> &file);
+
+    // Generate a file whose content is arbitrary.
+    static void create(const std::string &path, 
std::shared_ptr<local_test_file> &file);
+
+    [[nodiscard]] const dsn::replication::file_meta &get_file_meta() const { 
return _file_meta; }
+
+private:
+    explicit local_test_file(const dsn::replication::file_meta &meta);
+    ~local_test_file();
+
+    static void deleter(local_test_file *ptr) { delete ptr; }
+
+    dsn::replication::file_meta _file_meta;
+
+    DISALLOW_COPY_AND_ASSIGN(local_test_file);
+    DISALLOW_MOVE_AND_ASSIGN(local_test_file);
+};
 
 #define ASSERT_EVENTUALLY(expr)                                                
                    \
     do {                                                                       
                    \
diff --git a/src/utils/checksum.cpp b/src/utils/checksum.cpp
new file mode 100644
index 000000000..ce9147c2a
--- /dev/null
+++ b/src/utils/checksum.cpp
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "utils/checksum.h"
+
+#include "utils/error_code.h"
+#include "utils/filesystem.h"
+#include "utils/ports.h"
+
+namespace dsn {
+
+error_s
+calc_checksum(const std::string &file_path, utils::checksum_type::type type, 
std::string &result)
+{
+    switch (type) {
+    case utils::checksum_type::CST_MD5: {
+        const auto err = utils::filesystem::md5sum(file_path, result);
+        if (dsn_unlikely(err != ERR_OK)) {
+            return FMT_ERR(err, "md5sum failed: err = {}", err);
+        }
+
+    } break;
+
+    case utils::checksum_type::CST_NONE:
+        break;
+
+    default:
+        return FMT_ERR(ERR_NOT_IMPLEMENTED,
+                       "checksum_type is not supported: val = {}, str = {}",
+                       static_cast<int>(type),
+                       enum_to_string(type));
+    }
+
+    return error_s::ok();
+}
+
+} // namespace dsn
diff --git a/src/utils/checksum.h b/src/utils/checksum.h
new file mode 100644
index 000000000..2cdc42723
--- /dev/null
+++ b/src/utils/checksum.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "utils_types.h"
+#include "utils/enum_helper.h"
+#include "utils/errors.h"
+
+namespace dsn {
+
+ENUM_BEGIN2(utils::checksum_type::type, checksum_type, 
utils::checksum_type::CST_INVALID)
+ENUM_REG_WITH_CUSTOM_NAME(utils::checksum_type::CST_INVALID, invalid)
+ENUM_REG_WITH_CUSTOM_NAME(utils::checksum_type::CST_NONE, none)
+ENUM_REG_WITH_CUSTOM_NAME(utils::checksum_type::CST_MD5, md5)
+ENUM_END2(utils::checksum_type::type, checksum_type)
+
+// Calculate the checksum for a file.
+//
+// Parameters:
+// - `file_path`: the path of the file.
+// - `type`: decides which algorithm is used to calculate the checksum for the 
file.
+// CST_NONE means do not calculate the checksum.
+// - `result`: the output parameter that holds the resulting checksum.
+//
+// Return ok if succeed in calculating, otherwise return corresponding error.
+error_s
+calc_checksum(const std::string &file_path, utils::checksum_type::type type, 
std::string &result);
+
+} // namespace dsn
diff --git a/src/utils/test/CMakeLists.txt b/src/utils/test/CMakeLists.txt
index 8502cabb7..d1313f6e3 100644
--- a/src/utils/test/CMakeLists.txt
+++ b/src/utils/test/CMakeLists.txt
@@ -30,11 +30,12 @@ set(MY_PROJ_NAME dsn_utils_tests)
 set(MY_SRC_SEARCH_MODE "GLOB")
 
 set(MY_PROJ_LIBS
+        test_utils
+        dsn_replication_common
         dsn_http
         dsn_runtime
         dsn_utils
         gtest
-        test_utils
         rocksdb
         lz4
         zstd
diff --git a/src/utils/test/checksum_test.cpp b/src/utils/test/checksum_test.cpp
new file mode 100644
index 000000000..9b2920830
--- /dev/null
+++ b/src/utils/test/checksum_test.cpp
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "utils/checksum.h"
+
+#include <cstdint>
+#include <memory>
+#include <tuple>
+#include <type_traits>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "test_util/test_util.h"
+#include "utils/env.h"
+#include "utils/error_code.h"
+#include "utils/errors.h"
+#include "utils/filesystem.h"
+#include "utils/flags.h"
+#include "utils/test_macros.h"
+#include "utils_types.h"
+
+DSN_DECLARE_bool(encrypt_data_at_rest);
+
+namespace dsn {
+
+struct calc_checksum_case
+{
+    int64_t file_size;
+    utils::checksum_type::type type;
+    error_code expected_err;
+    std::string expected_checksum;
+};
+
+class CalcChecksumTest : public testing::TestWithParam<std::tuple<bool, 
calc_checksum_case>>
+{
+protected:
+    static void test_calc_checksum()
+    {
+        static const std::string kFilePath("test_file_for_calc_checksum");
+
+        const auto &[file_encrypted, test_case] = GetParam();
+
+        // Set flag to make file encrypted or unencrypted.
+        PRESERVE_FLAG(encrypt_data_at_rest);
+        FLAGS_encrypt_data_at_rest = file_encrypted;
+
+        // Generate the test file.
+        std::shared_ptr<pegasus::local_test_file> local_file;
+        NO_FATALS(pegasus::local_test_file::create(
+            kFilePath, std::string(test_case.file_size, 'a'), local_file));
+
+        // Check the file size.
+        int64_t file_size = 0;
+        ASSERT_TRUE(
+            utils::filesystem::file_size(kFilePath, 
utils::FileDataType::kSensitive, file_size));
+        ASSERT_EQ(test_case.file_size, file_size);
+
+        // Calculate the file checksum and check the return code.
+        std::string actual_checksum;
+        const auto actual_status = calc_checksum(kFilePath, test_case.type, 
actual_checksum);
+        ASSERT_EQ(test_case.expected_err, actual_status.code());
+        if (!actual_status) {
+            return;
+        }
+
+        // Check the file checksum only when the return code is ok.
+        ASSERT_EQ(test_case.expected_checksum, actual_checksum);
+    }
+};
+
+const std::vector<calc_checksum_case> calc_checksum_tests = {
+    {0, utils::checksum_type::CST_MD5, ERR_OK, 
"d41d8cd98f00b204e9800998ecf8427e"},
+    {1, utils::checksum_type::CST_MD5, ERR_OK, 
"0cc175b9c0f1b6a831c399e269772661"},
+    {2, utils::checksum_type::CST_MD5, ERR_OK, 
"4124bc0a9335c27f086f24ba207a4912"},
+    {4095, utils::checksum_type::CST_MD5, ERR_OK, 
"559110baa849c7608ee70abe1d76273e"},
+    {4096, utils::checksum_type::CST_MD5, ERR_OK, 
"21a199c53f422a380e20b162fb6ebe9c"},
+    {4097, utils::checksum_type::CST_MD5, ERR_OK, 
"8cfc1a0bd8cd76599e76e5e721c6e62e"},
+    {4095, utils::checksum_type::CST_NONE, ERR_OK, ""},
+    {4096, utils::checksum_type::CST_NONE, ERR_OK, ""},
+    {4097, utils::checksum_type::CST_NONE, ERR_OK, ""},
+    {4095, utils::checksum_type::CST_INVALID, ERR_NOT_IMPLEMENTED, ""},
+    {4096, utils::checksum_type::CST_INVALID, ERR_NOT_IMPLEMENTED, ""},
+    {4097, utils::checksum_type::CST_INVALID, ERR_NOT_IMPLEMENTED, ""},
+};
+
+TEST_P(CalcChecksumTest, CalcChecksum) { test_calc_checksum(); }
+
+INSTANTIATE_TEST_SUITE_P(ChecksumTest,
+                         CalcChecksumTest,
+                         testing::Combine(testing::Bool(), 
testing::ValuesIn(calc_checksum_tests)));
+
+} // namespace dsn
diff --git a/src/utils/test_macros.h b/src/utils/test_macros.h
index c1c9c3fd4..950e59745 100644
--- a/src/utils/test_macros.h
+++ b/src/utils/test_macros.h
@@ -34,8 +34,23 @@
         }                                                                      
                    \
     } while (0)
 
-// Substring matches.
+// Assert that `str` should contain the given `substr`.
 #define ASSERT_STR_CONTAINS(str, substr) ASSERT_THAT(str, 
testing::HasSubstr(substr))
 
+// Assert that `str` should not contain the given `substr`.
 #define ASSERT_STR_NOT_CONTAINS(str, substr)                                   
                    \
     ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr)))
+
+// Assert that `str` should begin with the given `prefix`.
+#define ASSERT_STR_STARTSWITH(str, prefix) ASSERT_THAT(str, 
testing::StartsWith(prefix))
+
+// Assert that `str` should not begin with the given `prefix`.
+#define ASSERT_STR_NOT_STARTSWITH(str, prefix)                                 
                    \
+    ASSERT_THAT(str, testing::Not(testing::StartsWith(prefix)))
+
+// Assert that `str` should end with the given `suffix`.
+#define ASSERT_STR_ENDSWITH(str, suffix) ASSERT_THAT(str, 
testing::EndsWith(suffix))
+
+// Assert that `str` should not end with the given `suffix`.
+#define ASSERT_STR_NOT_ENDSWITH(str, suffix)                                   
                    \
+    ASSERT_THAT(str, testing::Not(testing::EndsWith(suffix)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to