This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 980689707 feat(duplication): support resumable download for checkpoint
files during full duplication on the server side (#2238)
980689707 is described below
commit 9806897071585ce01cf794368c13d366595562c6
Author: Dan Wang <[email protected]>
AuthorDate: Mon May 12 12:11:22 2025 +0800
feat(duplication): support resumable download for checkpoint files during
full duplication on the server side (#2238)
https://github.com/apache/incubator-pegasus/issues/2242
When a dup follower requests the latest checkpoint information from the dup
master, the server supports returning the size and checksum of each file
under
the checkpoint directory. This allows the client to decide which files to
fetch:
- if it is the first time downloading the checkpoint, all files need to be
downloaded;
- if it is the second time or later, only files that are missing or
partially downloaded
locally need to be fetched.
Additionally, the client is allowed to specify the checksum algorithm to
use —
currently, only MD5 is supported.
---
idl/utils.thrift | 13 +
.../test/block_service_manager_test.cpp | 27 +-
src/common/consensus.thrift | 15 ++
.../bulk_load/test/replica_bulk_loader_test.cpp | 32 ++-
src/replica/mutation.h | 6 +-
src/replica/replica.h | 38 ++-
src/replica/replica_chkpt.cpp | 82 ++++--
src/replica/replica_stub.cpp | 7 +-
src/replica/replica_stub.h | 7 +
src/replica/test/replica_test.cpp | 29 +--
src/server/pegasus_server_impl.cpp | 5 +-
src/server/pegasus_server_impl.h | 2 +-
src/server/test/CMakeLists.txt | 1 +
src/server/test/capacity_unit_calculator_test.cpp | 4 +-
src/server/test/hotkey_collector_test.cpp | 4 +-
src/server/test/manual_compact_service_test.cpp | 6 +-
.../test/pegasus_compression_options_test.cpp | 1 -
src/server/test/pegasus_server_impl_test.cpp | 280 ++++++++++++++++++++-
src/server/test/pegasus_server_test_base.h | 52 ++--
src/server/test/pegasus_server_write_test.cpp | 4 +-
.../test/pegasus_write_service_impl_test.cpp | 2 +-
src/server/test/pegasus_write_service_test.cpp | 2 +-
src/server/test/rocksdb_wrapper_test.cpp | 25 +-
src/test_util/test_util.cpp | 40 ++-
src/test_util/test_util.h | 41 ++-
src/utils/checksum.cpp | 51 ++++
src/utils/checksum.h | 46 ++++
src/utils/test/CMakeLists.txt | 3 +-
src/utils/test/checksum_test.cpp | 106 ++++++++
src/utils/test_macros.h | 17 +-
30 files changed, 799 insertions(+), 149 deletions(-)
diff --git a/idl/utils.thrift b/idl/utils.thrift
index 659b2c514..f8f5491c3 100644
--- a/idl/utils.thrift
+++ b/idl/utils.thrift
@@ -42,3 +42,16 @@ enum pattern_match_type
// The string must match the given pattern as a regular expression.
PMT_MATCH_REGEX,
}
+
+// Specify which algorithm is used to calculate the checksum of a file.
+enum checksum_type
+{
+ CST_INVALID = 0,
+
+ // Do NOT calculate by any algorithm, which means no content will be
+ // generated.
+ CST_NONE,
+
+ // Use md5sum to calculate.
+ CST_MD5,
+}
diff --git a/src/block_service/test/block_service_manager_test.cpp
b/src/block_service/test/block_service_manager_test.cpp
index dd1d8dea6..aa8a3fd79 100644
--- a/src/block_service/test/block_service_manager_test.cpp
+++ b/src/block_service/test/block_service_manager_test.cpp
@@ -85,8 +85,11 @@ TEST_P(block_service_manager_test, remote_file_not_exist)
TEST_P(block_service_manager_test, local_file_exist)
{
-
NO_FATALS(pegasus::create_local_test_file(utils::filesystem::path_combine(LOCAL_DIR,
FILE_NAME),
- &_file_meta));
+ std::shared_ptr<pegasus::local_test_file> local_file;
+ NO_FATALS(pegasus::local_test_file::create(
+ utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME), local_file));
+ _file_meta = local_file->get_file_meta();
+
struct remote_file_info
{
int64_t size;
@@ -96,6 +99,7 @@ TEST_P(block_service_manager_test, local_file_exist)
{2333, _file_meta.md5}, // wrong size
{_file_meta.size, "bad_md5"} // wrong md5
};
+
for (const auto &test : tests) {
// The remote file will be overwritten when repeatedly created.
create_remote_file(FILE_NAME, test.size, test.md5);
@@ -107,15 +111,20 @@ TEST_P(block_service_manager_test, local_file_exist)
TEST_P(block_service_manager_test, do_download_succeed)
{
-
NO_FATALS(pegasus::create_local_test_file(utils::filesystem::path_combine(LOCAL_DIR,
FILE_NAME),
- &_file_meta));
+ // Allow local_file to be automatically deleted while out of the scope, to
mock
+ // the condition where the local file do not exist.
+ {
+ std::shared_ptr<pegasus::local_test_file> local_file;
+ NO_FATALS(pegasus::local_test_file::create(
+ utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME),
local_file));
+ _file_meta = local_file->get_file_meta();
+ }
+
create_remote_file(FILE_NAME, _file_meta.size, _file_meta.md5);
- // remove local file to mock condition that file not existed
- std::string file_name = utils::filesystem::path_combine(LOCAL_DIR,
FILE_NAME);
- utils::filesystem::remove_path(file_name);
+
uint64_t download_size = 0;
- ASSERT_EQ(test_download_file(download_size), ERR_OK);
- ASSERT_EQ(download_size, _file_meta.size);
+ ASSERT_EQ(ERR_OK, test_download_file(download_size));
+ ASSERT_EQ(_file_meta.size, download_size);
}
} // namespace block_service
diff --git a/src/common/consensus.thrift b/src/common/consensus.thrift
index 8952c090c..4489b730e 100644
--- a/src/common/consensus.thrift
+++ b/src/common/consensus.thrift
@@ -27,6 +27,7 @@
include "../../idl/dsn.thrift"
include "../../idl/dsn.layer2.thrift"
include "../../idl/metadata.thrift"
+include "../../idl/utils.thrift"
namespace cpp dsn.replication
@@ -131,6 +132,13 @@ struct learn_state
// Used by duplication. Holds the start_decree of this round of learn.
5:optional i64 learn_start_decree;
+
+ // file_sizes and file_checksums are only used to implement resumable
checkpoint download
+ // for duplication. While the dup follower receives file_sizes and
file_checksums, it will
+ // decide which files it already has and which files it should fetch from
the dup master
+ // (the dup master will reply to the dup follower with learn_response).
+ 6:optional list<i64> file_sizes;
+ 7:optional list<string> file_checksums;
}
enum learner_status
@@ -157,6 +165,13 @@ struct learn_request
// learnee will copy the missing logs.
7:optional i64 max_gced_decree;
8:optional dsn.host_port hp_learner;
+
+ // checksum_type is only used to implement resumable checkpoint download
for duplication.
+ // It decides which algorithm the dup master will use to calculate the
checksum for each
+ // file (learn_request will be sent from the dup follower to the dup
master). Since it is
+ // only used for duplication, by default it is CST_NONE which means do not
calculate file
+ // size and checksum.
+ 9:optional utils.checksum_type checksum_type =
utils.checksum_type.CST_NONE;
}
struct learn_response
diff --git a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
index b2e7fde0b..3c3139642 100644
--- a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
+++ b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
@@ -31,6 +31,7 @@
#include "rpc/rpc_host_port.h"
#include "task/task_tracker.h"
#include "test_util/test_util.h"
+#include "utils/defer.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/load_dump_object.h"
@@ -245,14 +246,17 @@ public:
_replica->set_primary_partition_configuration(pc);
}
- void create_local_metadata_file()
+ void create_local_metadata_file(std::shared_ptr<pegasus::local_test_file>
&local_file)
{
- NO_FATALS(pegasus::create_local_test_file(
- utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME),
&_file_meta));
+ NO_FATALS(pegasus::local_test_file::create(
+ utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME),
local_file));
+ _file_meta = local_file->get_file_meta();
+
_metadata.files.emplace_back(_file_meta);
_metadata.file_total_size = _file_meta.size;
- std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR,
METADATA);
- ASSERT_EQ(ERR_OK, utils::dump_rjobj_to_file(_metadata, whole_name));
+
+ const auto &full_path = utils::filesystem::path_combine(LOCAL_DIR,
METADATA);
+ ASSERT_EQ(ERR_OK, utils::dump_rjobj_to_file(_metadata, full_path));
}
bool validate_metadata()
@@ -520,22 +524,30 @@ TEST_P(replica_bulk_loader_test,
bulk_load_metadata_corrupt)
{
// create file can not parse as bulk_load_metadata structure
utils::filesystem::create_directory(LOCAL_DIR);
-
NO_FATALS(pegasus::create_local_test_file(utils::filesystem::path_combine(LOCAL_DIR,
METADATA),
- &_file_meta));
+ const auto cleanup =
+ dsn::defer([this]() {
ASSERT_TRUE(utils::filesystem::remove_path(LOCAL_DIR)); });
+
+ std::shared_ptr<pegasus::local_test_file> local_file;
+
NO_FATALS(pegasus::local_test_file::create(utils::filesystem::path_combine(LOCAL_DIR,
METADATA),
+ local_file));
+ _file_meta = local_file->get_file_meta();
+
std::string metadata_file_name =
utils::filesystem::path_combine(LOCAL_DIR, METADATA);
ASSERT_EQ(ERR_CORRUPTION,
test_parse_bulk_load_metadata(metadata_file_name));
- utils::filesystem::remove_path(LOCAL_DIR);
}
TEST_P(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
{
utils::filesystem::create_directory(LOCAL_DIR);
- NO_FATALS(create_local_metadata_file());
+ const auto cleanup =
+ dsn::defer([this]() {
ASSERT_TRUE(utils::filesystem::remove_path(LOCAL_DIR)); });
+
+ std::shared_ptr<pegasus::local_test_file> local_file;
+ NO_FATALS(create_local_metadata_file(local_file));
std::string metadata_file_name =
utils::filesystem::path_combine(LOCAL_DIR, METADATA);
ASSERT_EQ(ERR_OK, test_parse_bulk_load_metadata(metadata_file_name));
ASSERT_TRUE(validate_metadata());
- utils::filesystem::remove_path(LOCAL_DIR);
}
// finish download test
diff --git a/src/replica/mutation.h b/src/replica/mutation.h
index 62d23d0eb..e88a90dce 100644
--- a/src/replica/mutation.h
+++ b/src/replica/mutation.h
@@ -81,9 +81,6 @@ public:
mutation();
~mutation() override;
- DISALLOW_COPY_AND_ASSIGN(mutation);
- DISALLOW_MOVE_AND_ASSIGN(mutation);
-
// copy mutation from an existing mutation, typically used in partition
split
// mutation should not reply to client, because parent has already replied
static mutation_ptr copy_no_reply(const mutation_ptr &old_mu);
@@ -238,6 +235,9 @@ private:
uint64_t _tid; // trace id, unique in process
static std::atomic<uint64_t> s_tid;
bool _is_sync_to_child; // for partition split
+
+ DISALLOW_COPY_AND_ASSIGN(mutation);
+ DISALLOW_MOVE_AND_ASSIGN(mutation);
};
class replica;
diff --git a/src/replica/replica.h b/src/replica/replica.h
index b709bc849..50e0105c2 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -34,6 +34,7 @@
#include <map>
#include <memory>
#include <string>
+#include <type_traits>
#include <utility>
#include "common/json_helper.h"
@@ -61,13 +62,12 @@
#include "utils/thread_access_checker.h"
#include "utils/throttling_controller.h"
#include "utils/uniq_timestamp_us.h"
+#include "utils_types.h"
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
class pegasus_server_test_base;
class rocksdb_wrapper_test;
-} // namespace server
-} // namespace pegasus
+} // namespace pegasus::server
namespace dsn {
class gpid;
@@ -289,9 +289,23 @@ public:
// - `callback`: the callback processor handling the error code of
triggering checkpoint.
void async_trigger_manual_emergency_checkpoint(decree
min_checkpoint_decree,
uint32_t delay_ms,
- trigger_checkpoint_callback
callback = {});
+ trigger_checkpoint_callback
callback);
+
+ // The same as the above except that `callback` is empty.
+ void async_trigger_manual_emergency_checkpoint(decree
min_checkpoint_decree, uint32_t delay_ms);
+
+ // Get the last checkpoint info and put it into the response to reply to
the dup follower.
+ //
+ // Parameters:
+ // - `checksum_type`: specify which algorithm the server (namely dup
master) will use to
+ // calculate the checksum for each file. This parameter is used to
implement resumable
+ // checkpoint download for duplication: the client (namely dup follower)
receives the file
+ // sizes and checksums, then decides which files it should fetch from the
server accordingly.
+ // CST_NONE means do not calculate file size and checksum.
+ // - `response`: the output parameter, used to respond to the dup follower.
+ void on_query_last_checkpoint(utils::checksum_type::type checksum_type,
+ learn_response &response);
- void on_query_last_checkpoint(learn_response &response);
std::shared_ptr<replica_duplicator_manager> get_duplication_manager() const
{
return _duplication_mgr;
@@ -703,7 +717,17 @@ private:
// Currently only used for unit test to get the count of backup requests.
int64_t get_backup_request_count() const;
-private:
+ // Support self-defined `replication_app_base` at runtime which is only
used for test.
+ template <typename TApp,
+ typename... Args,
+ typename = typename std::enable_if<
+ std::is_base_of<replication_app_base,
+ typename
std::remove_pointer<TApp>::type>::value>::type>
+ void create_app_for_test(Args &&...args)
+ {
+ _app = std::make_unique<TApp>(std::forward<Args>(args)...);
+ }
+
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_log_tool;
friend class ::dsn::replication::mutation_queue;
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index 6ef6f451c..42275b954 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -25,12 +25,13 @@
*/
#include <fmt/core.h>
-#include <stdint.h>
#include <atomic>
#include <chrono>
+#include <cstdint>
#include <functional>
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "common/gpid.h"
@@ -56,13 +57,18 @@
#include "task/task.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
+#include "utils/checksum.h"
#include "utils/chrono_literals.h"
+#include "utils/env.h"
#include "utils/error_code.h"
+#include "utils/errors.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
+#include "utils/ports.h"
#include "utils/thread_access_checker.h"
+#include "utils_types.h"
/// The checkpoint of the replicated app part of replica.
@@ -245,6 +251,12 @@ void
replica::async_trigger_manual_emergency_checkpoint(decree min_checkpoint_de
std::chrono::milliseconds(delay_ms));
}
+void replica::async_trigger_manual_emergency_checkpoint(decree
min_checkpoint_decree,
+ uint32_t delay_ms)
+{
+ async_trigger_manual_emergency_checkpoint(min_checkpoint_decree, delay_ms,
{});
+}
+
// ThreadPool: THREAD_POOL_REPLICATION
error_code replica::trigger_manual_emergency_checkpoint(decree
min_checkpoint_decree)
{
@@ -313,7 +325,8 @@ void replica::init_checkpoint(bool is_emergency)
}
// ThreadPool: THREAD_POOL_REPLICATION
-void replica::on_query_last_checkpoint(/*out*/ learn_response &response)
+void replica::on_query_last_checkpoint(utils::checksum_type::type
checksum_type,
+ learn_response &response)
{
_checker.only_one_thread_access();
@@ -322,24 +335,65 @@ void replica::on_query_last_checkpoint(/*out*/
learn_response &response)
return;
}
- blob placeholder;
- int err = _app->get_checkpoint(0, placeholder, response.state);
- if (err != 0) {
+ if (dsn_unlikely(_app->get_checkpoint(0, blob(), response.state) !=
ERR_OK)) {
response.err = ERR_GET_LEARN_STATE_FAILED;
- } else {
- response.err = ERR_OK;
- response.last_committed_decree = last_committed_decree();
- // for example: base_local_dir = "./data" + "checkpoint.1024" =
"./data/checkpoint.1024"
- response.base_local_dir = utils::filesystem::path_combine(
- _app->data_dir(),
checkpoint_folder(response.state.to_decree_included));
- SET_IP_AND_HOST_PORT(
- response, learnee, _stub->primary_address(),
_stub->primary_host_port());
+ return;
+ }
+
+ response.err = ERR_OK;
+ response.last_committed_decree = last_committed_decree();
+
+ // For example: base_local_dir = "./data" + "checkpoint.1024" =
"./data/checkpoint.1024"
+ response.base_local_dir = utils::filesystem::path_combine(
+ _app->data_dir(),
checkpoint_folder(response.state.to_decree_included));
+
+ SET_IP_AND_HOST_PORT(response, learnee, _stub->primary_address(),
_stub->primary_host_port());
+
+ // If the client does not require the calculation of the checksum, only
respond with name
+ // for each file.
+ if (checksum_type <= utils::checksum_type::CST_NONE) {
for (auto &file : response.state.files) {
- // response.state.files contain file absolute path, for example:
+ // response.state.files contain file absolute path, for example:
// "./data/checkpoint.1024/1.sst" use `substr` to get the file
name: 1.sst
file = file.substr(response.base_local_dir.length() + 1);
}
+
+ return;
+ }
+
+ std::vector<int64_t> file_sizes;
+ std::vector<std::string> file_checksums;
+ file_sizes.reserve(response.state.files.size());
+ file_checksums.reserve(response.state.files.size());
+ for (auto &file : response.state.files) {
+ int64_t size = 0;
+ if (dsn_unlikely(
+ !utils::filesystem::file_size(file,
utils::FileDataType::kSensitive, size))) {
+ LOG_ERROR_PREFIX("get size of file failed: file = {}", file);
+ response.err = ERR_GET_LEARN_STATE_FAILED;
+ return;
+ }
+
+ std::string checksum;
+ const auto result = calc_checksum(file, checksum_type, checksum);
+ if (dsn_unlikely(!result)) {
+ LOG_ERROR_PREFIX("calculate checksum failed: file = {}, err = {}",
file, result);
+ response.err = ERR_GET_LEARN_STATE_FAILED;
+ return;
+ }
+
+ file_sizes.push_back(size);
+ file_checksums.push_back(std::move(checksum));
+
+ // response.state.files contain file absolute path, for example:
+ // "./data/checkpoint.1024/1.sst" use `substr` to get the file name:
1.sst
+ file = file.substr(response.base_local_dir.length() + 1);
}
+
+ response.state.__isset.file_sizes = true;
+ response.state.__isset.file_checksums = true;
+ response.state.file_sizes = std::move(file_sizes);
+ response.state.file_checksums = std::move(file_checksums);
}
// run in background thread
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 6b1f06031..0003d1e9e 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1211,11 +1211,12 @@ void
replica_stub::on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc)
learn_response &response = rpc.response();
replica_ptr rep = get_replica(request.pid);
- if (rep != nullptr) {
- rep->on_query_last_checkpoint(response);
- } else {
+ if (dsn_unlikely(rep == nullptr)) {
response.err = ERR_OBJECT_NOT_FOUND;
+ return;
}
+
+ rep->on_query_last_checkpoint(request.checksum_type, response);
}
// ThreadPool: THREAD_POOL_DEFAULT
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 7d03d06ee..8d67dc044 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -71,6 +71,12 @@
#include "utils/metrics.h"
#include "utils/zlocks.h"
+namespace pegasus::server {
+
+class pegasus_server_test_base;
+
+} // namespace pegasus::server
+
namespace dsn::utils {
class ex_lock;
@@ -494,6 +500,7 @@ private:
friend class replica_disk_test_base;
friend class replica_disk_migrate_test;
friend class replica_stub_test_base;
+ friend class pegasus::server::pegasus_server_test_base;
friend class open_replica_test;
friend class replica_follower;
friend class replica_follower_test;
diff --git a/src/replica/test/replica_test.cpp
b/src/replica/test/replica_test.cpp
index d1fd9ab04..2c9bbb1bd 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-#include <stddef.h>
-#include <stdint.h>
#include <atomic>
+#include <cstddef>
+#include <cstdint>
#include <functional>
#include <iostream>
#include <map>
@@ -37,9 +37,7 @@
#include "common/replication_common.h"
#include "common/replication_enums.h"
#include "common/replication_other_types.h"
-#include "consensus_types.h"
#include "dsn.layer2_types.h"
-#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "http/http_server.h"
#include "http/http_status_code.h"
@@ -515,29 +513,6 @@ TEST_P(replica_test,
test_trigger_manual_emergency_checkpoint)
_mock_replica->tracker()->wait_outstanding_tasks();
}
-TEST_P(replica_test, test_query_last_checkpoint_info)
-{
- // test no exist gpid
- auto req = std::make_unique<learn_request>();
- req->pid = gpid(100, 100);
- query_last_checkpoint_info_rpc rpc =
- query_last_checkpoint_info_rpc(std::move(req),
RPC_QUERY_LAST_CHECKPOINT_INFO);
- stub->on_query_last_checkpoint(rpc);
- ASSERT_EQ(rpc.response().err, ERR_OBJECT_NOT_FOUND);
-
- learn_response resp;
- // last_checkpoint hasn't exist
- _mock_replica->on_query_last_checkpoint(resp);
- ASSERT_EQ(resp.err, ERR_PATH_NOT_FOUND);
-
- // query ok
- _mock_replica->update_last_durable_decree(100);
- _mock_replica->set_last_committed_decree(200);
- _mock_replica->on_query_last_checkpoint(resp);
- ASSERT_EQ(resp.last_committed_decree, 200);
- ASSERT_STR_CONTAINS(resp.base_local_dir, "/data/checkpoint.100");
-}
-
TEST_P(replica_test, test_clear_on_failure)
{
// Clear up the remaining state.
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index e745f98fb..de9abc7b0 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -2232,13 +2232,14 @@ private:
{
CHECK(_is_open, "");
- int64_t ci = last_durable_decree();
+ const int64_t ci = last_durable_decree();
if (ci == 0) {
LOG_ERROR_PREFIX("no checkpoint found");
return ::dsn::ERR_OBJECT_NOT_FOUND;
}
- auto chkpt_dir = ::dsn::utils::filesystem::path_combine(data_dir(),
chkpt_get_dir_name(ci));
+ const auto chkpt_dir =
+ ::dsn::utils::filesystem::path_combine(data_dir(),
chkpt_get_dir_name(ci));
state.files.clear();
if (!::dsn::utils::filesystem::get_subfiles(chkpt_dir, state.files, true))
{
LOG_ERROR_PREFIX("list files in checkpoint dir {} failed", chkpt_dir);
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index deeb75e4d..b385137e0 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -249,6 +249,7 @@ public:
private:
friend class manual_compact_service_test;
friend class pegasus_compression_options_test;
+ friend class pegasus_server_test_base;
friend class pegasus_server_impl_test;
friend class hotkey_collector_test;
FRIEND_TEST(pegasus_server_impl_test, default_data_version);
@@ -478,7 +479,6 @@ private:
void
log_expired_data(const char *op, const dsn::rpc_address &addr, const
rocksdb::Slice &key) const;
-private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index 082a98c1e..a3f647092 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -43,6 +43,7 @@ set(MY_PROJ_LIBS
dsn.failure_detector
dsn.replication.zookeeper_provider
dsn_utils
+ test_utils
rocksdb
lz4
zstd
diff --git a/src/server/test/capacity_unit_calculator_test.cpp
b/src/server/test/capacity_unit_calculator_test.cpp
index 87309e675..c0486d418 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -104,9 +104,9 @@ protected:
public:
dsn::blob key, hash_key;
- capacity_unit_calculator_test() : pegasus_server_test_base()
+ capacity_unit_calculator_test()
{
- _cal = std::make_unique<mock_capacity_unit_calculator>(_server.get());
+ _cal = std::make_unique<mock_capacity_unit_calculator>(_server);
pegasus_generate_key(key, dsn::blob::create_from_bytes("h"),
dsn::blob());
hash_key = dsn::blob::create_from_bytes("key");
}
diff --git a/src/server/test/hotkey_collector_test.cpp
b/src/server/test/hotkey_collector_test.cpp
index 688efcb02..1c93e0910 100644
--- a/src/server/test/hotkey_collector_test.cpp
+++ b/src/server/test/hotkey_collector_test.cpp
@@ -107,7 +107,7 @@ TEST(hotkey_collector_public_func_test,
find_outlier_index_test)
class coarse_collector_test : public pegasus_server_test_base
{
public:
- coarse_collector_test() : coarse_collector(_server.get(),
FLAGS_hotkey_buckets_num){};
+ coarse_collector_test() : coarse_collector(_server,
FLAGS_hotkey_buckets_num){};
hotkey_coarse_data_collector coarse_collector;
@@ -164,7 +164,7 @@ public:
int max_queue_size = 1000;
int target_bucket_index = 0;
hotkey_fine_data_collector fine_collector;
- fine_collector_test() : fine_collector(_server.get(), 1, max_queue_size)
+ fine_collector_test() : fine_collector(_server, 1, max_queue_size)
{
fine_collector.change_target_bucket(0);
};
diff --git a/src/server/test/manual_compact_service_test.cpp
b/src/server/test/manual_compact_service_test.cpp
index 8ccd761b5..95638bfb3 100644
--- a/src/server/test/manual_compact_service_test.cpp
+++ b/src/server/test/manual_compact_service_test.cpp
@@ -48,16 +48,16 @@ public:
manual_compact_service_test()
{
start();
- manual_compact_svc =
std::make_unique<pegasus_manual_compact_service>(_server.get());
+ manual_compact_svc =
std::make_unique<pegasus_manual_compact_service>(_server);
}
- void set_compact_time(int64_t ts)
+ void set_compact_time(int64_t ts) const
{
manual_compact_svc->_manual_compact_last_finish_time_ms.store(
static_cast<uint64_t>(ts * 1000));
}
- void set_mock_now(uint64_t mock_now_sec)
+ void set_mock_now(uint64_t mock_now_sec) const
{
manual_compact_svc->_mock_now_timestamp = mock_now_sec * 1000;
}
diff --git a/src/server/test/pegasus_compression_options_test.cpp
b/src/server/test/pegasus_compression_options_test.cpp
index 4836bced1..4dacb80f7 100644
--- a/src/server/test/pegasus_compression_options_test.cpp
+++ b/src/server/test/pegasus_compression_options_test.cpp
@@ -21,7 +21,6 @@
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <map>
-#include <memory>
#include <string>
#include <vector>
diff --git a/src/server/test/pegasus_server_impl_test.cpp
b/src/server/test/pegasus_server_impl_test.cpp
index f2a491575..9245a8cc6 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -17,38 +17,53 @@
* under the License.
*/
+// IWYU pragma: no_include <ext/alloc_traits.h>
#include <base/pegasus_key_schema.h>
#include <fmt/core.h>
#include <rocksdb/db.h>
#include <rocksdb/options.h>
-#include <stdint.h>
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <initializer_list>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
+#include <vector>
+#include "common/gpid.h"
#include "common/replica_envs.h"
+#include "common/replication.codes.h"
+#include "common/replication_other_types.h"
+#include "consensus_types.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "pegasus_server_test_base.h"
+#include "replica/replica_stub.h"
+#include "rpc/rpc_holder.h"
#include "rrdb/rrdb.code.definition.h"
#include "rrdb/rrdb_types.h"
#include "runtime/serverlet.h"
#include "server/pegasus_read_service.h"
+#include "test_util/test_util.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
+#include "utils/defer.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
+#include "utils/fmt_logging.h"
#include "utils/metrics.h"
+#include "utils/test_macros.h"
+#include "utils_types.h"
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
class pegasus_server_impl_test : public pegasus_server_test_base
{
-public:
- pegasus_server_impl_test() : pegasus_server_test_base() {}
+protected:
+ pegasus_server_impl_test() = default;
void test_table_level_slow_query()
{
@@ -140,6 +155,194 @@ public:
}
}
}
+
+ // Test last checkpoint query in `pegasus_server_impl_test` rather than in
`replica_test`,
+ // because `pegasus_server_impl::get_checkpoint()` needs to be tested
instead of being
+ // mocked.
+ void test_query_last_checkpoint(bool create_checkpoint_dir,
+ const dsn::gpid &pid,
+ dsn::utils::checksum_type::type
checksum_type,
+ const dsn::error_code &expected_err,
+ dsn::replication::decree
expected_last_committed_decree,
+ dsn::replication::decree
expected_last_durable_decree,
+ const std::vector<std::string>
&expected_file_names,
+ const std::vector<int64_t>
&expected_file_sizes,
+ const std::vector<std::string>
&expected_file_checksums)
+ {
+ const std::string checkpoint_dir(dsn::utils::filesystem::path_combine(
+ _server->data_dir(), fmt::format("checkpoint.{}",
expected_last_durable_decree)));
+
+ // After the test case is finished, remove the whole checkpoint dir.
+ const auto cleanup = dsn::defer([checkpoint_dir,
expected_file_names]() {
+ if (!dsn::utils::filesystem::directory_exists(checkpoint_dir)) {
+ return;
+ }
+
+ ASSERT_TRUE(dsn::utils::filesystem::remove_path(checkpoint_dir));
+ });
+
+ std::map<std::string, std::shared_ptr<local_test_file>> local_files;
+ if (create_checkpoint_dir) {
+
ASSERT_TRUE(dsn::utils::filesystem::create_directory(checkpoint_dir));
+
+ // Generate all files in the checkpoint dir with respective
specified size.
+ for (size_t i = 0; i < expected_file_names.size(); ++i) {
+ const auto file_path =
+ dsn::utils::filesystem::path_combine(checkpoint_dir,
expected_file_names[i]);
+
+ std::shared_ptr<local_test_file> local_file;
+ NO_FATALS(local_test_file::create(
+ file_path, std::string(expected_file_sizes[i], 'a'),
local_file));
+ ASSERT_TRUE(local_files.emplace(file_path, local_file).second);
+ }
+ }
+
+ // Build the request to get the last checkpoint info.
+ auto req = std::make_unique<dsn::replication::learn_request>();
+ req->pid = pid;
+ req->__set_checksum_type(checksum_type);
+
+ // Mock the RPC.
+ auto rpc =
dsn::replication::query_last_checkpoint_info_rpc(std::move(req),
+
RPC_QUERY_LAST_CHECKPOINT_INFO);
+
+ // Execute the last checkpoint query on server side.
+ _replica_stub->on_query_last_checkpoint(rpc);
+
+ // The error code in the response should be match the expected.
+ const auto &resp = rpc.response();
+ ASSERT_EQ(expected_err, resp.err);
+
+ // No need to check others in the response once the error code is not
ERR_OK.
+ if (expected_err != dsn::ERR_OK) {
+ return;
+ }
+
+ ASSERT_EQ(expected_last_committed_decree, resp.last_committed_decree);
+ ASSERT_STR_ENDSWITH(resp.base_local_dir,
+ fmt::format("/data/checkpoint.{}",
expected_last_durable_decree));
+
+ // Check whether all file names in the response are matched.
+ check_checkpoint_file_names(resp.state.files, expected_file_names);
+
+ // The file sizes and checksums shoule be kept empty if checksum is
not required.
+ if (checksum_type <= dsn::utils::checksum_type::CST_NONE) {
+ ASSERT_FALSE(resp.state.__isset.file_sizes);
+ ASSERT_FALSE(resp.state.__isset.file_checksums);
+ ASSERT_TRUE(resp.state.file_sizes.empty());
+ ASSERT_TRUE(resp.state.file_checksums.empty());
+ return;
+ }
+
+ // Check whether all file sizes in the response are matched.
+ ASSERT_TRUE(resp.state.__isset.file_sizes);
+ check_checkpoint_file_sizes(resp.state.file_sizes, resp.state.files,
expected_file_sizes);
+
+ // Check whether all file checksums in the response are matched.
+ ASSERT_TRUE(resp.state.__isset.file_checksums);
+ check_checkpoint_file_checksums(
+ resp.state.file_checksums, resp.state.files,
expected_file_checksums);
+ }
+
+ // Only test for failed cases.
+ void test_query_last_checkpoint(bool create_checkpoint_dir,
+ const dsn::gpid &pid,
+ const dsn::error_code &expected_err)
+ {
+ test_query_last_checkpoint(create_checkpoint_dir,
+ pid,
+ dsn::utils::checksum_type::CST_INVALID,
+ expected_err,
+ 0,
+ 0,
+ {},
+ {},
+ {});
+ }
+
+ // Only test for failed cases.
+ void test_query_last_checkpoint(const dsn::gpid &pid, const
dsn::error_code &expected_err)
+ {
+ test_query_last_checkpoint(true, pid, expected_err);
+ }
+
+ // Test for each checksum type.
+ void test_query_last_checkpoint_for_all_checksum_types(
+ dsn::replication::decree expected_last_committed_decree,
+ dsn::replication::decree expected_last_durable_decree,
+ const std::vector<std::string> &expected_file_names,
+ const std::vector<int64_t> &expected_file_sizes,
+ const std::vector<std::string> &expected_file_checksums)
+ {
+ for (const auto checksum_type :
{dsn::utils::checksum_type::CST_INVALID,
+ dsn::utils::checksum_type::CST_NONE,
+ dsn::utils::checksum_type::CST_MD5}) {
+ test_query_last_checkpoint(true,
+ _gpid,
+ checksum_type,
+ dsn::ERR_OK,
+ 200,
+ 100,
+ expected_file_names,
+ expected_file_sizes,
+ expected_file_checksums);
+ }
+ }
+
+private:
+ static void check_checkpoint_file_names(const std::vector<std::string>
&file_names,
+ const std::vector<std::string>
&expected_file_names)
+ {
+ std::vector<std::string> ordered_file_names(file_names);
+ std::sort(ordered_file_names.begin(), ordered_file_names.end());
+ ASSERT_EQ(expected_file_names, ordered_file_names);
+ }
+
+ // Sort `file_properties` so that its order is consistent with that of
`file_names`.
+ template <typename TFileProperty>
+ static std::vector<TFileProperty>
+ sort_checkpoint_file_properties(const std::vector<TFileProperty>
&file_properties,
+ const std::vector<std::string> &file_names)
+ {
+ CHECK_EQ(file_properties.size(), file_names.size());
+
+ std::vector<size_t> order_indices(file_names.size());
+ for (size_t i = 0; i < order_indices.size(); ++i) {
+ order_indices[i] = i;
+ }
+
+ std::sort(order_indices.begin(), order_indices.end(),
[&file_names](size_t a, size_t b) {
+ return file_names[a] < file_names[b];
+ });
+
+ std::vector<TFileProperty> ordered_file_properties;
+ ordered_file_properties.reserve(file_properties.size());
+ for (size_t idx : order_indices) {
+ ordered_file_properties.push_back(file_properties[idx]);
+ }
+
+ return ordered_file_properties;
+ }
+
+ static void check_checkpoint_file_sizes(const std::vector<int64_t>
&file_sizes,
+ const std::vector<std::string>
&file_names,
+ const std::vector<int64_t>
&expected_file_sizes)
+ {
+ const auto ordered_file_sizes =
sort_checkpoint_file_properties(file_sizes, file_names);
+
+ ASSERT_EQ(expected_file_sizes, ordered_file_sizes);
+ }
+
+ static void
+ check_checkpoint_file_checksums(const std::vector<std::string>
&file_checksums,
+ const std::vector<std::string> &file_names,
+ const std::vector<std::string>
&expected_file_checksums)
+ {
+ const auto ordered_file_checksums =
+ sort_checkpoint_file_properties(file_checksums, file_names);
+
+ ASSERT_EQ(expected_file_checksums, ordered_file_checksums);
+ }
};
INSTANTIATE_TEST_SUITE_P(, pegasus_server_impl_test, ::testing::Values(false,
true));
@@ -251,5 +454,68 @@ TEST_P(pegasus_server_impl_test,
test_load_from_duplication_data)
dsn::utils::filesystem::remove_file_name(new_file);
}
-} // namespace server
-} // namespace pegasus
+// Fail to get last checkpoint since the replica does not exist.
+TEST_P(pegasus_server_impl_test,
test_query_last_checkpoint_with_replica_not_found)
+{
+ // To make sure the replica does not exist, give a gpid that does not
exist.
+ test_query_last_checkpoint(dsn::gpid(101, 101), dsn::ERR_OBJECT_NOT_FOUND);
+}
+
+// Fail to get last checkpoint since it does not exist.
+TEST_P(pegasus_server_impl_test,
test_query_last_checkpoint_with_last_checkpoint_not_exist)
+{
+ // To make sure the last checkpoint does not exist, set
last_durable_decree zero.
+ set_last_committed_decree(0);
+ set_last_durable_decree(0);
+
+ test_query_last_checkpoint(_gpid, dsn::ERR_PATH_NOT_FOUND);
+}
+
+// Fail to get last checkpoint since its dir does not exist.
+TEST_P(pegasus_server_impl_test,
test_query_last_checkpoint_with_last_checkpoint_dir_not_exist)
+{
+ ASSERT_EQ(dsn::ERR_OK, start());
+
+ // Make sure the last_durable_decree is not zero.
+ set_last_committed_decree(200);
+ set_last_durable_decree(100);
+
+ test_query_last_checkpoint(false, _gpid, dsn::ERR_GET_LEARN_STATE_FAILED);
+}
+
+// Succeed in getting last checkpoint whose dir is empty and has no file.
+TEST_P(pegasus_server_impl_test, test_query_last_checkpoint_with_empty_dir)
+{
+ ASSERT_EQ(dsn::ERR_OK, start());
+
+ // Make sure the last_durable_decree is not zero.
+ set_last_committed_decree(200);
+ set_last_durable_decree(100);
+
+ test_query_last_checkpoint_for_all_checksum_types(200, 100, {}, {}, {});
+}
+
+// Succeed in getting last checkpoint whose dir is not empty and has some
files.
+TEST_P(pegasus_server_impl_test, test_query_last_checkpoint_with_non_empty_dir)
+{
+ static const std::vector<std::string> kCheckpointFileNames = {
+ "test_file.1", "test_file.2", "test_file.3", "test_file.4",
"test_file.5", "test_file.6"};
+ static const std::vector<int64_t> kCheckpointFileSizes = {0, 1, 2, 4095,
4096, 4097};
+ static const std::vector<std::string> kCheckpointFileChecksums = {
+ "d41d8cd98f00b204e9800998ecf8427e",
+ "0cc175b9c0f1b6a831c399e269772661",
+ "4124bc0a9335c27f086f24ba207a4912",
+ "559110baa849c7608ee70abe1d76273e",
+ "21a199c53f422a380e20b162fb6ebe9c",
+ "8cfc1a0bd8cd76599e76e5e721c6e62e"};
+
+ ASSERT_EQ(dsn::ERR_OK, start());
+
+ set_last_committed_decree(200);
+ set_last_durable_decree(100);
+
+ test_query_last_checkpoint_for_all_checksum_types(
+ 200, 100, kCheckpointFileNames, kCheckpointFileSizes,
kCheckpointFileChecksums);
+}
+
+} // namespace pegasus::server
diff --git a/src/server/test/pegasus_server_test_base.h
b/src/server/test/pegasus_server_test_base.h
index 00a46cf75..2ea7fdc25 100644
--- a/src/server/test/pegasus_server_test_base.h
+++ b/src/server/test/pegasus_server_test_base.h
@@ -24,10 +24,11 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "common/fs_manager.h"
-#include "utils/flags.h"
#include "replica/replica_stub.h"
#include "test_util/test_util.h"
+#include "utils/casts.h"
#include "utils/filesystem.h"
+#include "utils/flags.h"
DSN_DECLARE_bool(encrypt_data_at_rest);
@@ -50,7 +51,7 @@ public:
{
// Remove rdb to prevent rocksdb recovery from last test.
dsn::utils::filesystem::remove_path("./test_dir");
- _replica_stub = new dsn::replication::replica_stub();
+ _replica_stub = std::make_unique<dsn::replication::replica_stub>();
_replica_stub->get_fs_manager()->initialize({"test_dir"},
{"test_tag"});
// Use different gpid for encryption and non-encryption test to avoid
reopening a rocksdb
@@ -64,19 +65,29 @@ public:
dsn::app_info app_info;
app_info.app_type = "pegasus";
+ initialize_replica(app_info);
+
+ CHECK(dsn::utils::filesystem::create_directory(_server->data_dir()),
+ "create data dir {} failed",
+ _server->data_dir());
+ }
+
+ void initialize_replica(const dsn::app_info &app_info)
+ {
+ _replica.reset();
+
auto *dn =
_replica_stub->get_fs_manager()->find_best_dir_for_new_replica(_gpid);
CHECK_NOTNULL(dn, "");
- _replica = new dsn::replication::replica(_replica_stub, _gpid,
app_info, dn, false, false);
- const auto dir_data = dsn::utils::filesystem::path_combine(
- _replica->dir(), dsn::replication::replication_app_base::kDataDir);
- CHECK(dsn::utils::filesystem::create_directory(dir_data),
- "create data dir {} failed",
- dir_data);
- _server = std::make_unique<mock_pegasus_server_impl>(_replica);
+ _replica =
+ new dsn::replication::replica(_replica_stub.get(), _gpid,
app_info, dn, false, false);
+ _replica_stub->_replicas[_gpid] = _replica;
+
+
_replica->create_app_for_test<mock_pegasus_server_impl>(_replica.get());
+ _server = dsn::down_cast<mock_pegasus_server_impl
*>(_replica->_app.get());
}
- dsn::error_code start(const std::map<std::string, std::string> &envs = {})
+ dsn::error_code start(const std::map<std::string, std::string> &envs)
{
std::unique_ptr<char *[]> argvs = std::make_unique<char *[]>(1 +
envs.size() * 2);
char **argv = argvs.get();
@@ -91,20 +102,29 @@ public:
return _server->start(idx, argv);
}
+ dsn::error_code start() { return start({}); }
+
+ void set_last_committed_decree(dsn::replication::decree d)
+ {
+ _replica->_prepare_list->reset(d);
+ }
+
+ void set_last_durable_decree(dsn::replication::decree d)
+ {
+ _server->set_last_durable_decree(d);
+ }
+
~pegasus_server_test_base() override
{
// do not clear state
_server->stop(false);
-
- delete _replica_stub;
- delete _replica;
}
protected:
- std::unique_ptr<mock_pegasus_server_impl> _server;
- dsn::replication::replica *_replica = nullptr;
- dsn::replication::replica_stub *_replica_stub = nullptr;
+ std::unique_ptr<dsn::replication::replica_stub> _replica_stub;
dsn::gpid _gpid;
+ dsn::replication::replica_ptr _replica;
+ mock_pegasus_server_impl *_server;
};
} // namespace server
diff --git a/src/server/test/pegasus_server_write_test.cpp
b/src/server/test/pegasus_server_write_test.cpp
index 4b7c6fc84..f9ab7b414 100644
--- a/src/server/test/pegasus_server_write_test.cpp
+++ b/src/server/test/pegasus_server_write_test.cpp
@@ -52,10 +52,10 @@ class pegasus_server_write_test : public
pegasus_server_test_base
std::unique_ptr<pegasus_server_write> _server_write;
public:
- pegasus_server_write_test() : pegasus_server_test_base()
+ pegasus_server_write_test()
{
start();
- _server_write = std::make_unique<pegasus_server_write>(_server.get());
+ _server_write = std::make_unique<pegasus_server_write>(_server);
}
void test_batch_writes()
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp
b/src/server/test/pegasus_write_service_impl_test.cpp
index 053dd6333..192c3110e 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -62,7 +62,7 @@ protected:
void SetUp() override
{
ASSERT_EQ(dsn::ERR_OK, start());
- _server_write = std::make_unique<pegasus_server_write>(_server.get());
+ _server_write = std::make_unique<pegasus_server_write>(_server);
_write_impl = _server_write->_write_svc->_impl.get();
_rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
}
diff --git a/src/server/test/pegasus_write_service_test.cpp
b/src/server/test/pegasus_write_service_test.cpp
index e5c5b4382..b4db69849 100644
--- a/src/server/test/pegasus_write_service_test.cpp
+++ b/src/server/test/pegasus_write_service_test.cpp
@@ -59,7 +59,7 @@ public:
void SetUp() override
{
start();
- _server_write = std::make_unique<pegasus_server_write>(_server.get());
+ _server_write = std::make_unique<pegasus_server_write>(_server);
_write_svc = _server_write->_write_svc.get();
}
diff --git a/src/server/test/rocksdb_wrapper_test.cpp
b/src/server/test/rocksdb_wrapper_test.cpp
index 7831afc71..bfd8e11e2 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -18,31 +18,28 @@
*/
#include <fmt/core.h>
-#include <stdint.h>
+#include <algorithm>
+#include <cstdint>
#include <memory>
#include <string>
+#include <string_view>
#include <utility>
-#include "common/fs_manager.h"
#include "dsn.layer2_types.h"
#include "gtest/gtest.h"
#include "pegasus_key_schema.h"
#include "pegasus_server_test_base.h"
#include "pegasus_utils.h"
#include "pegasus_value_schema.h"
-#include "replica/replica.h"
-#include "replica/replica_stub.h"
#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service.h"
#include "server/pegasus_write_service_impl.h"
#include "server/rocksdb_wrapper.h"
#include "utils/blob.h"
#include "utils/error_code.h"
-#include "utils/fmt_logging.h"
-#include <string_view>
-namespace pegasus {
-namespace server {
+namespace pegasus::server {
+
class rocksdb_wrapper_test : public pegasus_server_test_base
{
protected:
@@ -56,7 +53,7 @@ public:
void SetUp() override
{
ASSERT_EQ(::dsn::ERR_OK, start());
- _server_write = std::make_unique<pegasus_server_write>(_server.get());
+ _server_write = std::make_unique<pegasus_server_write>(_server);
_rocksdb_wrapper =
_server_write->_write_svc->_impl->_rocksdb_wrapper.get();
pegasus::pegasus_generate_key(
@@ -79,16 +76,12 @@ public:
void set_app_duplicating()
{
_server->stop(false);
- delete _replica;
dsn::app_info app_info;
app_info.app_type = "pegasus";
app_info.duplicating = true;
- auto *dn =
_replica_stub->get_fs_manager()->find_best_dir_for_new_replica(_gpid);
- CHECK_NOTNULL(dn, "");
- _replica = new dsn::replication::replica(_replica_stub, _gpid,
app_info, dn, false, false);
- _server = std::make_unique<mock_pegasus_server_impl>(_replica);
+ initialize_replica(app_info);
SetUp();
}
@@ -232,5 +225,5 @@ TEST_P(rocksdb_wrapper_test,
verify_timetag_compatible_with_version_0)
_rocksdb_wrapper->_pegasus_data_version, std::move(get_ctx.raw_value),
user_value);
ASSERT_EQ(user_value.to_string(), value);
}
-} // namespace server
-} // namespace pegasus
+
+} // namespace pegasus::server
diff --git a/src/test_util/test_util.cpp b/src/test_util/test_util.cpp
index 338ec4ddf..32e354654 100644
--- a/src/test_util/test_util.cpp
+++ b/src/test_util/test_util.cpp
@@ -22,7 +22,6 @@
#include <thread>
#include "gtest/gtest.h"
-#include "metadata_types.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
@@ -35,19 +34,38 @@
namespace pegasus {
-void create_local_test_file(const std::string &full_name,
dsn::replication::file_meta *fm)
+void local_test_file::create(const std::string &path,
+ const std::string &content,
+ std::shared_ptr<local_test_file> &file)
{
- ASSERT_NE(fm, nullptr);
- auto s =
+ const auto status =
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
- rocksdb::Slice("write some data."),
- full_name,
+ rocksdb::Slice(content),
+ path,
/* should_sync */ true);
- ASSERT_TRUE(s.ok()) << s.ToString();
- fm->name = full_name;
- ASSERT_EQ(dsn::ERR_OK, dsn::utils::filesystem::md5sum(full_name, fm->md5));
- ASSERT_TRUE(dsn::utils::filesystem::file_size(
- full_name, dsn::utils::FileDataType::kSensitive, fm->size));
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ dsn::replication::file_meta meta;
+ meta.name = path;
+ ASSERT_TRUE(
+ dsn::utils::filesystem::file_size(path,
dsn::utils::FileDataType::kSensitive, meta.size));
+ ASSERT_EQ(dsn::ERR_OK, dsn::utils::filesystem::md5sum(path, meta.md5));
+
+ file = std::shared_ptr<local_test_file>(new local_test_file(meta),
deleter);
+}
+
+void local_test_file::create(const std::string &path,
std::shared_ptr<local_test_file> &file)
+{
+ return create(path, "write some data.", file);
+}
+
+local_test_file::local_test_file(const dsn::replication::file_meta &meta) :
_file_meta(meta) {}
+
+local_test_file::~local_test_file()
+{
+ // We don't check whether returning ture, since the dir where the file is
located may have
+ // been removed.
+ dsn::utils::filesystem::remove_path(_file_meta.name);
}
void AssertEventually(const std::function<void(void)> &f, int timeout_sec,
WaitBackoff backoff)
diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h
index 549d8fd94..74badb880 100644
--- a/src/test_util/test_util.h
+++ b/src/test_util/test_util.h
@@ -25,28 +25,26 @@
#include <cstdint>
#include <cstdio>
#include <functional>
+#include <memory>
#include <string>
+#include "metadata_types.h"
#include "runtime/api_layer1.h"
-#include "utils/env.h"
// IWYU refused to include "utils/defer.h" everywhere, both in .h and .cpp
files.
// However, once "utils/defer.h" is not included, it is inevitable that
compilation
// will fail since dsn::defer is referenced. Thus force IWYU to keep it.
#include "utils/defer.h" // IWYU pragma: keep
+#include "utils/env.h"
#include "utils/flags.h"
+#include "utils/ports.h"
#include "utils/test_macros.h"
DSN_DECLARE_bool(encrypt_data_at_rest);
-namespace dsn {
-namespace replication {
-class file_meta;
-} // namespace replication
-} // namespace dsn
-
#define PRESERVE_VAR(name, expr)
\
const auto PRESERVED_##name = expr;
\
- auto PRESERVED_##name##_cleanup = dsn::defer([PRESERVED_##name]() { expr =
PRESERVED_##name; })
+ const auto PRESERVED_##name##_cleanup =
\
+ dsn::defer([PRESERVED_##name]() { expr = PRESERVED_##name; })
// Save the current value of a flag and restore it at the end of the function.
#define PRESERVE_FLAG(name) PRESERVE_VAR(FLAGS_##name, FLAGS_##name)
@@ -90,7 +88,32 @@ private:
uint64_t _start_ms = 0;
};
-void create_local_test_file(const std::string &full_name,
dsn::replication::file_meta *fm);
+// Used to generate a local file for test whose life cycle is managed with
RAII: the file
+// will be removed automatically in destructor.
+class local_test_file
+{
+public:
+ // Generate a file whose content is user-defined.
+ static void create(const std::string &path,
+ const std::string &content,
+ std::shared_ptr<local_test_file> &file);
+
+ // Generate a file whose content is arbitrary.
+ static void create(const std::string &path,
std::shared_ptr<local_test_file> &file);
+
+ [[nodiscard]] const dsn::replication::file_meta &get_file_meta() const {
return _file_meta; }
+
+private:
+ explicit local_test_file(const dsn::replication::file_meta &meta);
+ ~local_test_file();
+
+ static void deleter(local_test_file *ptr) { delete ptr; }
+
+ dsn::replication::file_meta _file_meta;
+
+ DISALLOW_COPY_AND_ASSIGN(local_test_file);
+ DISALLOW_MOVE_AND_ASSIGN(local_test_file);
+};
#define ASSERT_EVENTUALLY(expr)
\
do {
\
diff --git a/src/utils/checksum.cpp b/src/utils/checksum.cpp
new file mode 100644
index 000000000..ce9147c2a
--- /dev/null
+++ b/src/utils/checksum.cpp
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "utils/checksum.h"
+
+#include "utils/error_code.h"
+#include "utils/filesystem.h"
+#include "utils/ports.h"
+
+namespace dsn {
+
+error_s
+calc_checksum(const std::string &file_path, utils::checksum_type::type type,
std::string &result)
+{
+ switch (type) {
+ case utils::checksum_type::CST_MD5: {
+ const auto err = utils::filesystem::md5sum(file_path, result);
+ if (dsn_unlikely(err != ERR_OK)) {
+ return FMT_ERR(err, "md5sum failed: err = {}", err);
+ }
+
+ } break;
+
+ case utils::checksum_type::CST_NONE:
+ break;
+
+ default:
+ return FMT_ERR(ERR_NOT_IMPLEMENTED,
+ "checksum_type is not supported: val = {}, str = {}",
+ static_cast<int>(type),
+ enum_to_string(type));
+ }
+
+ return error_s::ok();
+}
+
+} // namespace dsn
diff --git a/src/utils/checksum.h b/src/utils/checksum.h
new file mode 100644
index 000000000..2cdc42723
--- /dev/null
+++ b/src/utils/checksum.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "utils_types.h"
+#include "utils/enum_helper.h"
+#include "utils/errors.h"
+
+namespace dsn {
+
+ENUM_BEGIN2(utils::checksum_type::type, checksum_type,
utils::checksum_type::CST_INVALID)
+ENUM_REG_WITH_CUSTOM_NAME(utils::checksum_type::CST_INVALID, invalid)
+ENUM_REG_WITH_CUSTOM_NAME(utils::checksum_type::CST_NONE, none)
+ENUM_REG_WITH_CUSTOM_NAME(utils::checksum_type::CST_MD5, md5)
+ENUM_END2(utils::checksum_type::type, checksum_type)
+
+// Calculate the checksum for a file.
+//
+// Parameters:
+// - `file_path`: the path of the file.
+// - `type`: decides which algorithm is used to calculate the checksum for the
file.
+// CST_NONE means do not calculate the checksum.
+// - `result`: the output parameter that holds the resulting checksum.
+//
+// Return ok if succeed in calculating, otherwise return corresponding error.
+error_s
+calc_checksum(const std::string &file_path, utils::checksum_type::type type,
std::string &result);
+
+} // namespace dsn
diff --git a/src/utils/test/CMakeLists.txt b/src/utils/test/CMakeLists.txt
index 8502cabb7..d1313f6e3 100644
--- a/src/utils/test/CMakeLists.txt
+++ b/src/utils/test/CMakeLists.txt
@@ -30,11 +30,12 @@ set(MY_PROJ_NAME dsn_utils_tests)
set(MY_SRC_SEARCH_MODE "GLOB")
set(MY_PROJ_LIBS
+ test_utils
+ dsn_replication_common
dsn_http
dsn_runtime
dsn_utils
gtest
- test_utils
rocksdb
lz4
zstd
diff --git a/src/utils/test/checksum_test.cpp b/src/utils/test/checksum_test.cpp
new file mode 100644
index 000000000..9b2920830
--- /dev/null
+++ b/src/utils/test/checksum_test.cpp
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "utils/checksum.h"
+
+#include <cstdint>
+#include <memory>
+#include <tuple>
+#include <type_traits>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "test_util/test_util.h"
+#include "utils/env.h"
+#include "utils/error_code.h"
+#include "utils/errors.h"
+#include "utils/filesystem.h"
+#include "utils/flags.h"
+#include "utils/test_macros.h"
+#include "utils_types.h"
+
+DSN_DECLARE_bool(encrypt_data_at_rest);
+
+namespace dsn {
+
+struct calc_checksum_case
+{
+ int64_t file_size;
+ utils::checksum_type::type type;
+ error_code expected_err;
+ std::string expected_checksum;
+};
+
+class CalcChecksumTest : public testing::TestWithParam<std::tuple<bool,
calc_checksum_case>>
+{
+protected:
+ static void test_calc_checksum()
+ {
+ static const std::string kFilePath("test_file_for_calc_checksum");
+
+ const auto &[file_encrypted, test_case] = GetParam();
+
+ // Set flag to make file encrypted or unencrypted.
+ PRESERVE_FLAG(encrypt_data_at_rest);
+ FLAGS_encrypt_data_at_rest = file_encrypted;
+
+ // Generate the test file.
+ std::shared_ptr<pegasus::local_test_file> local_file;
+ NO_FATALS(pegasus::local_test_file::create(
+ kFilePath, std::string(test_case.file_size, 'a'), local_file));
+
+ // Check the file size.
+ int64_t file_size = 0;
+ ASSERT_TRUE(
+ utils::filesystem::file_size(kFilePath,
utils::FileDataType::kSensitive, file_size));
+ ASSERT_EQ(test_case.file_size, file_size);
+
+ // Calculate the file checksum and check the return code.
+ std::string actual_checksum;
+ const auto actual_status = calc_checksum(kFilePath, test_case.type,
actual_checksum);
+ ASSERT_EQ(test_case.expected_err, actual_status.code());
+ if (!actual_status) {
+ return;
+ }
+
+ // Check the file checksum only when the return code is ok.
+ ASSERT_EQ(test_case.expected_checksum, actual_checksum);
+ }
+};
+
+const std::vector<calc_checksum_case> calc_checksum_tests = {
+ {0, utils::checksum_type::CST_MD5, ERR_OK,
"d41d8cd98f00b204e9800998ecf8427e"},
+ {1, utils::checksum_type::CST_MD5, ERR_OK,
"0cc175b9c0f1b6a831c399e269772661"},
+ {2, utils::checksum_type::CST_MD5, ERR_OK,
"4124bc0a9335c27f086f24ba207a4912"},
+ {4095, utils::checksum_type::CST_MD5, ERR_OK,
"559110baa849c7608ee70abe1d76273e"},
+ {4096, utils::checksum_type::CST_MD5, ERR_OK,
"21a199c53f422a380e20b162fb6ebe9c"},
+ {4097, utils::checksum_type::CST_MD5, ERR_OK,
"8cfc1a0bd8cd76599e76e5e721c6e62e"},
+ {4095, utils::checksum_type::CST_NONE, ERR_OK, ""},
+ {4096, utils::checksum_type::CST_NONE, ERR_OK, ""},
+ {4097, utils::checksum_type::CST_NONE, ERR_OK, ""},
+ {4095, utils::checksum_type::CST_INVALID, ERR_NOT_IMPLEMENTED, ""},
+ {4096, utils::checksum_type::CST_INVALID, ERR_NOT_IMPLEMENTED, ""},
+ {4097, utils::checksum_type::CST_INVALID, ERR_NOT_IMPLEMENTED, ""},
+};
+
+TEST_P(CalcChecksumTest, CalcChecksum) { test_calc_checksum(); }
+
+INSTANTIATE_TEST_SUITE_P(ChecksumTest,
+ CalcChecksumTest,
+ testing::Combine(testing::Bool(),
testing::ValuesIn(calc_checksum_tests)));
+
+} // namespace dsn
diff --git a/src/utils/test_macros.h b/src/utils/test_macros.h
index c1c9c3fd4..950e59745 100644
--- a/src/utils/test_macros.h
+++ b/src/utils/test_macros.h
@@ -34,8 +34,23 @@
}
\
} while (0)
-// Substring matches.
+// Assert that `str` should contain the given `substr`.
#define ASSERT_STR_CONTAINS(str, substr) ASSERT_THAT(str,
testing::HasSubstr(substr))
+// Assert that `str` should not contain the given `substr`.
#define ASSERT_STR_NOT_CONTAINS(str, substr)
\
ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr)))
+
+// Assert that `str` should begin with the given `prefix`.
+#define ASSERT_STR_STARTSWITH(str, prefix) ASSERT_THAT(str,
testing::StartsWith(prefix))
+
+// Assert that `str` should not begin with the given `prefix`.
+#define ASSERT_STR_NOT_STARTSWITH(str, prefix)
\
+ ASSERT_THAT(str, testing::Not(testing::StartsWith(prefix)))
+
+// Assert that `str` should end with the given `suffix`.
+#define ASSERT_STR_ENDSWITH(str, suffix) ASSERT_THAT(str,
testing::EndsWith(suffix))
+
+// Assert that `str` should not end with the given `suffix`.
+#define ASSERT_STR_NOT_ENDSWITH(str, suffix)
\
+ ASSERT_THAT(str, testing::Not(testing::EndsWith(suffix)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]