This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new fc416e3ac feat(make_idempotent): sync `atomic_idempotent` from meta
server into `.app-info` file for each replica (#2220)
fc416e3ac is described below
commit fc416e3acf0f1a58114c081ddc315d0703f8a9c8
Author: Dan Wang <[email protected]>
AuthorDate: Mon Apr 14 14:19:54 2025 +0800
feat(make_idempotent): sync `atomic_idempotent` from meta server into
`.app-info` file for each replica (#2220)
https://github.com/apache/incubator-pegasus/issues/2197
In https://github.com/apache/incubator-pegasus/pull/2205, we introduced a
new attribute `atomic_idempotent` to meta, which is used to enable/disable
the idempotence of atomic write operations.
This attribute should be broadcast to each replica: whenever one replica is
promoted to the primary, by this attribute it can decide whether to make all
atomic writes idempotent. This attribute will be persisted into `.app-info`
file to ensure it can be loaded after restarted.
---
src/replica/replica.cpp | 38 +++++++--
src/replica/replica.h | 21 ++++-
src/replica/replica_config.cpp | 37 +++++++--
src/replica/replica_disk_migrator.cpp | 4 +-
src/replica/replica_init.cpp | 5 +-
src/replica/split/replica_split_manager.cpp | 2 +-
src/replica/test/replica_test.cpp | 122 +++++++++++++++++++---------
7 files changed, 169 insertions(+), 60 deletions(-)
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 7e2c65f35..deaaa6fe1 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -708,18 +708,44 @@ uint32_t replica::query_data_version() const
return _app->query_data_version();
}
-error_code replica::store_app_info(app_info &info, const std::string &path)
+error_code replica::store_app_info(app_info &info, const std::string &dir)
{
- replica_app_info new_info((app_info *)&info);
- const auto &info_path =
- path.empty() ? utils::filesystem::path_combine(_dir,
replica_app_info::kAppInfo) : path;
- auto err = new_info.store(info_path);
+ const auto path = utils::filesystem::path_combine(dir,
replica_app_info::kAppInfo);
+
+ replica_app_info rep_info(&info);
+ const auto err = rep_info.store(path);
+ if (dsn_unlikely(err != ERR_OK)) {
+ LOG_ERROR_PREFIX("failed to save app_info to {}, error = {}", path,
err);
+ }
+
+ return err;
+}
+
+error_code replica::store_app_info(app_info &info) { return
store_app_info(info, _dir); }
+
+error_code replica::store_app_info(const std::string &dir)
+{
+ return store_app_info(_app_info, dir);
+}
+
+error_code replica::store_app_info() { return store_app_info(_app_info, _dir);
}
+
+error_code replica::load_app_info(const std::string &dir, app_info &info) const
+{
+ const auto path =
+ utils::filesystem::path_combine(dir,
dsn::replication::replica_app_info::kAppInfo);
+
+ replica_app_info rep_info(&info);
+ const auto err = rep_info.load(path);
if (dsn_unlikely(err != ERR_OK)) {
- LOG_ERROR_PREFIX("failed to save app_info to {}, error = {}",
info_path, err);
+ LOG_ERROR_PREFIX("failed to load app_info from {}, error = {}", path,
err);
}
+
return err;
}
+error_code replica::load_app_info(app_info &info) const { return
load_app_info(_dir, info); }
+
bool replica::access_controller_allowed(message_ex *msg, const
ranger::access_type &ac_type) const
{
return !_access_controller->is_enable_ranger_acl() ||
_access_controller->allowed(msg, ac_type);
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 0632d2e08..fb199bf44 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -673,11 +673,26 @@ private:
// update envs to deny client request
void update_deny_client(const std::map<std::string, std::string> &envs);
- // store `info` into a file under `path` directory
- // path = "" means using the default directory (`_dir`/.app_info)
- error_code store_app_info(app_info &info, const std::string &path = "");
+ // Write the specified `info` into .app_info file under the specified
`dir` directory.
+ error_code store_app_info(app_info &info, const std::string &dir);
+
+ // Write the specified `info` into .app_info file under `_dir` directory.
+ error_code store_app_info(app_info &info);
+
+ // Write `_app_info` into .app_info file under the specified `dir`
directory.
+ error_code store_app_info(const std::string &dir);
+
+ // Write `_app_info` into .app_info file under `_dir` directory.
+ error_code store_app_info();
+
+ // Load `info` from .app_info file under the specified `dir` directory.
+ error_code load_app_info(const std::string &dir, app_info &info) const;
+
+ // Load `info` from .app_info file under `_dir` directory.
+ error_code load_app_info(app_info &info) const;
void update_app_max_replica_count(int32_t max_replica_count);
+ void update_app_atomic_idempotent(bool atomic_idempotent);
void update_app_name(const std::string &app_name);
bool is_data_corrupted() const { return _data_corrupted; }
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index a9cae880c..6a4080f66 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -1072,10 +1072,12 @@ void replica::on_config_sync(const app_info &info,
{
LOG_DEBUG_PREFIX("configuration sync");
// no outdated update
- if (pc.ballot < get_ballot())
+ if (pc.ballot < get_ballot()) {
return;
+ }
update_app_max_replica_count(info.max_replica_count);
+ update_app_atomic_idempotent(info.atomic_idempotent);
update_app_name(info.app_name);
update_app_envs(info.envs);
_is_duplication_master = info.duplicating;
@@ -1121,10 +1123,10 @@ void replica::update_app_name(const std::string
&app_name)
return;
}
- auto old_app_name = _app_info.app_name;
+ const auto old_app_name = _app_info.app_name;
_app_info.app_name = app_name;
- CHECK_EQ_PREFIX_MSG(store_app_info(_app_info),
+ CHECK_EQ_PREFIX_MSG(store_app_info(),
ERR_OK,
"store_app_info for app_name failed: "
"app_name={}, app_id={}, old_app_name={}",
@@ -1139,10 +1141,10 @@ void replica::update_app_max_replica_count(int32_t
max_replica_count)
return;
}
- auto old_max_replica_count = _app_info.max_replica_count;
+ const auto old_max_replica_count = _app_info.max_replica_count;
_app_info.max_replica_count = max_replica_count;
- CHECK_EQ_PREFIX_MSG(store_app_info(_app_info),
+ CHECK_EQ_PREFIX_MSG(store_app_info(),
ERR_OK,
"store_app_info for max_replica_count failed:
app_name={}, "
"app_id={}, old_max_replica_count={},
new_max_replica_count={}",
@@ -1152,6 +1154,27 @@ void replica::update_app_max_replica_count(int32_t
max_replica_count)
_app_info.max_replica_count);
}
+void replica::update_app_atomic_idempotent(bool atomic_idempotent)
+{
+ // No need to check `_app_info.__isset.atomic_idempotent`, since by
default it is true
+ // (because `_app_info.atomic_idempotent` has default value false).
+ if (atomic_idempotent == _app_info.atomic_idempotent) {
+ return;
+ }
+
+ const auto old_atomic_idempotent = _app_info.atomic_idempotent;
+ _app_info.atomic_idempotent = atomic_idempotent;
+
+ CHECK_EQ_PREFIX_MSG(store_app_info(),
+ ERR_OK,
+ "store_app_info for atomic_idempotent failed:
app_name={}, "
+ "app_id={}, old_atomic_idempotent={},
new_atomic_idempotent={}",
+ _app_info.app_name,
+ _app_info.app_id,
+ old_atomic_idempotent,
+ _app_info.atomic_idempotent);
+}
+
void replica::replay_prepare_list()
{
decree start = last_committed_decree() + 1;
@@ -1190,10 +1213,10 @@ void replica::update_app_duplication_status(bool
duplicating)
return;
}
- auto old_duplicating = _app_info.duplicating;
+ const auto old_duplicating = _app_info.duplicating;
_app_info.__set_duplicating(duplicating);
- CHECK_EQ_PREFIX_MSG(store_app_info(_app_info),
+ CHECK_EQ_PREFIX_MSG(store_app_info(),
ERR_OK,
"store_app_info for duplicating failed: app_name={}, "
"app_id={}, old_duplicating={}, new_duplicating={}",
diff --git a/src/replica/replica_disk_migrator.cpp
b/src/replica/replica_disk_migrator.cpp
index 597644103..12f5e0425 100644
--- a/src/replica/replica_disk_migrator.cpp
+++ b/src/replica/replica_disk_migrator.cpp
@@ -263,9 +263,7 @@ bool replica_disk_migrator::migrate_replica_app_info(const
replica_disk_migrate_
return false;
}
- const auto &store_info_err = _replica->store_app_info(
- _replica->_app_info,
- utils::filesystem::path_combine(_target_replica_dir,
replica_app_info::kAppInfo));
+ const auto &store_info_err = _replica->store_app_info(_target_replica_dir);
if (store_info_err != ERR_OK) {
LOG_ERROR_PREFIX("disk migration(origin={}, target={}) stores app info
failed({})",
req.origin_disk,
diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp
index 1bee60eda..54b65b166 100644
--- a/src/replica/replica_init.cpp
+++ b/src/replica/replica_init.cpp
@@ -50,6 +50,7 @@
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
+#include "utils/ports.h"
#include "utils/uniq_timestamp_us.h"
DSN_DEFINE_bool(replication,
@@ -84,8 +85,8 @@ error_code replica::initialize_on_new()
return ERR_FILE_OPERATION_FAILED;
}
- auto err = store_app_info(_app_info);
- if (err != ERR_OK) {
+ const auto err = store_app_info();
+ if (dsn_unlikely(err != ERR_OK)) {
dsn::utils::filesystem::remove_path(_dir);
return err;
}
diff --git a/src/replica/split/replica_split_manager.cpp
b/src/replica/split/replica_split_manager.cpp
index 56f4c00e2..4afd5df97 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -871,7 +871,7 @@ void replica_split_manager::update_local_partition_count(
if (info.init_partition_count < 0) {
info.init_partition_count = info.partition_count;
}
- auto old_partition_count = info.partition_count;
+ const auto old_partition_count = info.partition_count;
info.partition_count = new_partition_count;
CHECK_EQ_PREFIX_MSG(_replica->store_app_info(info), ERR_OK, "failed to
save app_info");
diff --git a/src/replica/test/replica_test.cpp
b/src/replica/test/replica_test.cpp
index 616449b2a..d1fd9ab04 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -48,7 +48,6 @@
#include "replica/replica.h"
#include "replica/replica_http_service.h"
#include "replica/replica_stub.h"
-#include "replica/replication_app_base.h"
#include "replica/test/mock_utils.h"
#include "replica_test_base.h"
#include "rpc/network.sim.h"
@@ -102,15 +101,26 @@ public:
// FLAGS_cold_backup_root is set by configuration
"replication.cold_backup_root",
// which is usually the cluster_name of production clusters.
FLAGS_cold_backup_root = "test_cluster";
+
+ // Initially update the whole .app-info file with `_app_info` as the
defined format
+ // (encrypted or unencrypted).
+ ASSERT_EQ(ERR_OK, _mock_replica->store_app_info());
}
- int64_t get_backup_request_count() const { return
_mock_replica->get_backup_request_count(); }
+ [[nodiscard]] int64_t get_backup_request_count() const
+ {
+ return _mock_replica->get_backup_request_count();
+ }
- bool get_validate_partition_hash() const { return
_mock_replica->_validate_partition_hash; }
+ [[nodiscard]] bool get_validate_partition_hash() const
+ {
+ return _mock_replica->_validate_partition_hash;
+ }
- void reset_validate_partition_hash() {
_mock_replica->_validate_partition_hash = false; }
+ void reset_validate_partition_hash() const {
_mock_replica->_validate_partition_hash = false; }
- void update_validate_partition_hash(bool old_value, bool set_in_map,
std::string new_value)
+ void
+ update_validate_partition_hash(bool old_value, bool set_in_map,
std::string new_value) const
{
_mock_replica->_validate_partition_hash = old_value;
std::map<std::string, std::string> envs;
@@ -240,43 +250,51 @@ public:
return false;
}
- void test_update_app_max_replica_count()
+ // Load `info` from .app-info file to test whether it is consistent with
the one
+ // in memory.
+ void load_app_info(app_info &info) const
+ {
+ std::cout << ".app-info file is under " << _mock_replica->_dir <<
std::endl;
+ ASSERT_EQ(ERR_OK, _mock_replica->load_app_info(info));
+
+ std::cout << "The loaded app_info is " << info << std::endl;
+ ASSERT_EQ(_mock_replica->_app_info, info);
+ }
+
+ void load_app_max_replica_count(int32_t expected_max_replica_count) const
+ {
+ app_info info;
+ load_app_info(info);
+ ASSERT_EQ(expected_max_replica_count, info.max_replica_count);
+ }
+
+ // Test `max_replica_count` both on disk and in memory after updated.
+ void update_app_max_replica_count(int32_t expected_max_replica_count)
+ {
+
_mock_replica->update_app_max_replica_count(expected_max_replica_count);
+ _app_info.max_replica_count = expected_max_replica_count;
+
+ load_app_max_replica_count(expected_max_replica_count);
+ }
+
+ void load_app_atomic_idempotent(bool expected_atomic_idempotent) const
{
- const auto reserved_max_replica_count = _app_info.max_replica_count;
- const int32_t target_max_replica_count = 5;
- CHECK_NE(target_max_replica_count, reserved_max_replica_count);
-
- // store new max_replica_count into file
- _mock_replica->update_app_max_replica_count(target_max_replica_count);
- _app_info.max_replica_count = target_max_replica_count;
-
- dsn::app_info info;
- replica_app_info replica_info(&info);
-
- auto path = dsn::utils::filesystem::path_combine(
- _mock_replica->_dir, dsn::replication::replica_app_info::kAppInfo);
- std::cout << "the path of .app-info file is " << path << std::endl;
-
- // load new max_replica_count from file
- auto err = replica_info.load(path);
- ASSERT_EQ(ERR_OK, err);
- ASSERT_EQ(info, _mock_replica->_app_info);
- std::cout << "the loaded new app_info is " << info << std::endl;
-
- // recover original max_replica_count
-
_mock_replica->update_app_max_replica_count(reserved_max_replica_count);
- _app_info.max_replica_count = reserved_max_replica_count;
-
- // load original max_replica_count from file
- err = replica_info.load(path);
- ASSERT_EQ(err, ERR_OK);
- ASSERT_EQ(info, _mock_replica->_app_info);
- std::cout << "the loaded original app_info is " << info << std::endl;
+ app_info info;
+ load_app_info(info);
+ ASSERT_EQ(expected_atomic_idempotent, info.atomic_idempotent);
+ }
+
+ // Test `atomic_idempotent` both on disk and in memory after updated.
+ void update_app_atomic_idempotent(bool expected_atomic_idempotent)
+ {
+
_mock_replica->update_app_atomic_idempotent(expected_atomic_idempotent);
+ _app_info.atomic_idempotent = expected_atomic_idempotent;
+
+ load_app_atomic_idempotent(expected_atomic_idempotent);
}
void test_auto_trash(error_code ec);
-public:
dsn::app_info _app_info;
dsn::gpid _pid;
mock_replica_ptr _mock_replica;
@@ -628,7 +646,35 @@ TEST_P(replica_test, update_deny_client_test)
}
}
-TEST_P(replica_test, test_update_app_max_replica_count) {
test_update_app_max_replica_count(); }
+TEST_P(replica_test, test_update_app_max_replica_count)
+{
+ const auto original_max_replica_count = _app_info.max_replica_count;
+ const int32_t target_max_replica_count = 5;
+
+ // The new value should not be equal to the original one.
+ CHECK_NE(target_max_replica_count, original_max_replica_count);
+
+ // Test the original value of `max_replica_count`.
+ load_app_max_replica_count(original_max_replica_count);
+
+ // Test `max_replica_count` after updated to the new value.
+ update_app_max_replica_count(target_max_replica_count);
+
+ // Test `max_replica_count` after recovered to the original value.
+ update_app_max_replica_count(original_max_replica_count);
+}
+
+TEST_P(replica_test, test_update_app_atomic_idempotent)
+{
+ // Test the default value of `atomic_idempotent` which should be false.
+ load_app_atomic_idempotent(false);
+
+ // Test `atomic_idempotent` after updated to true.
+ update_app_atomic_idempotent(true);
+
+ // Test `atomic_idempotent` after recovered to the default value.
+ update_app_atomic_idempotent(false);
+}
} // namespace replication
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]