This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new fc416e3ac feat(make_idempotent): sync `atomic_idempotent` from meta 
server into `.app-info` file for each replica (#2220)
fc416e3ac is described below

commit fc416e3acf0f1a58114c081ddc315d0703f8a9c8
Author: Dan Wang <[email protected]>
AuthorDate: Mon Apr 14 14:19:54 2025 +0800

    feat(make_idempotent): sync `atomic_idempotent` from meta server into 
`.app-info` file for each replica (#2220)
    
    https://github.com/apache/incubator-pegasus/issues/2197
    
    In https://github.com/apache/incubator-pegasus/pull/2205, we introduced a
    new attribute `atomic_idempotent` to meta, which is used to enable/disable
    the idempotence of atomic write operations.
    
    This attribute should be broadcast to each replica: whenever one replica is
    promoted to the primary, by this attribute it can decide whether to make all
    atomic writes idempotent. This attribute will be persisted into `.app-info`
    file to ensure it can be loaded after restarted.
---
 src/replica/replica.cpp                     |  38 +++++++--
 src/replica/replica.h                       |  21 ++++-
 src/replica/replica_config.cpp              |  37 +++++++--
 src/replica/replica_disk_migrator.cpp       |   4 +-
 src/replica/replica_init.cpp                |   5 +-
 src/replica/split/replica_split_manager.cpp |   2 +-
 src/replica/test/replica_test.cpp           | 122 +++++++++++++++++++---------
 7 files changed, 169 insertions(+), 60 deletions(-)

diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 7e2c65f35..deaaa6fe1 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -708,18 +708,44 @@ uint32_t replica::query_data_version() const
     return _app->query_data_version();
 }
 
-error_code replica::store_app_info(app_info &info, const std::string &path)
+error_code replica::store_app_info(app_info &info, const std::string &dir)
 {
-    replica_app_info new_info((app_info *)&info);
-    const auto &info_path =
-        path.empty() ? utils::filesystem::path_combine(_dir, 
replica_app_info::kAppInfo) : path;
-    auto err = new_info.store(info_path);
+    const auto path = utils::filesystem::path_combine(dir, 
replica_app_info::kAppInfo);
+
+    replica_app_info rep_info(&info);
+    const auto err = rep_info.store(path);
+    if (dsn_unlikely(err != ERR_OK)) {
+        LOG_ERROR_PREFIX("failed to save app_info to {}, error = {}", path, 
err);
+    }
+
+    return err;
+}
+
+error_code replica::store_app_info(app_info &info) { return 
store_app_info(info, _dir); }
+
+error_code replica::store_app_info(const std::string &dir)
+{
+    return store_app_info(_app_info, dir);
+}
+
+error_code replica::store_app_info() { return store_app_info(_app_info, _dir); 
}
+
+error_code replica::load_app_info(const std::string &dir, app_info &info) const
+{
+    const auto path =
+        utils::filesystem::path_combine(dir, 
dsn::replication::replica_app_info::kAppInfo);
+
+    replica_app_info rep_info(&info);
+    const auto err = rep_info.load(path);
     if (dsn_unlikely(err != ERR_OK)) {
-        LOG_ERROR_PREFIX("failed to save app_info to {}, error = {}", 
info_path, err);
+        LOG_ERROR_PREFIX("failed to load app_info from {}, error = {}", path, 
err);
     }
+
     return err;
 }
 
+error_code replica::load_app_info(app_info &info) const { return 
load_app_info(_dir, info); }
+
 bool replica::access_controller_allowed(message_ex *msg, const 
ranger::access_type &ac_type) const
 {
     return !_access_controller->is_enable_ranger_acl() || 
_access_controller->allowed(msg, ac_type);
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 0632d2e08..fb199bf44 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -673,11 +673,26 @@ private:
     // update envs to deny client request
     void update_deny_client(const std::map<std::string, std::string> &envs);
 
-    // store `info` into a file under `path` directory
-    // path = "" means using the default directory (`_dir`/.app_info)
-    error_code store_app_info(app_info &info, const std::string &path = "");
+    // Write the specified `info` into .app_info file under the specified 
`dir` directory.
+    error_code store_app_info(app_info &info, const std::string &dir);
+
+    // Write the specified `info` into .app_info file under `_dir` directory.
+    error_code store_app_info(app_info &info);
+
+    // Write `_app_info` into .app_info file under the specified `dir` 
directory.
+    error_code store_app_info(const std::string &dir);
+
+    // Write `_app_info` into .app_info file under `_dir` directory.
+    error_code store_app_info();
+
+    // Load `info` from .app_info file under the specified `dir` directory.
+    error_code load_app_info(const std::string &dir, app_info &info) const;
+
+    // Load `info` from .app_info file under `_dir` directory.
+    error_code load_app_info(app_info &info) const;
 
     void update_app_max_replica_count(int32_t max_replica_count);
+    void update_app_atomic_idempotent(bool atomic_idempotent);
     void update_app_name(const std::string &app_name);
 
     bool is_data_corrupted() const { return _data_corrupted; }
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index a9cae880c..6a4080f66 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -1072,10 +1072,12 @@ void replica::on_config_sync(const app_info &info,
 {
     LOG_DEBUG_PREFIX("configuration sync");
     // no outdated update
-    if (pc.ballot < get_ballot())
+    if (pc.ballot < get_ballot()) {
         return;
+    }
 
     update_app_max_replica_count(info.max_replica_count);
+    update_app_atomic_idempotent(info.atomic_idempotent);
     update_app_name(info.app_name);
     update_app_envs(info.envs);
     _is_duplication_master = info.duplicating;
@@ -1121,10 +1123,10 @@ void replica::update_app_name(const std::string 
&app_name)
         return;
     }
 
-    auto old_app_name = _app_info.app_name;
+    const auto old_app_name = _app_info.app_name;
     _app_info.app_name = app_name;
 
-    CHECK_EQ_PREFIX_MSG(store_app_info(_app_info),
+    CHECK_EQ_PREFIX_MSG(store_app_info(),
                         ERR_OK,
                         "store_app_info for app_name failed: "
                         "app_name={}, app_id={}, old_app_name={}",
@@ -1139,10 +1141,10 @@ void replica::update_app_max_replica_count(int32_t 
max_replica_count)
         return;
     }
 
-    auto old_max_replica_count = _app_info.max_replica_count;
+    const auto old_max_replica_count = _app_info.max_replica_count;
     _app_info.max_replica_count = max_replica_count;
 
-    CHECK_EQ_PREFIX_MSG(store_app_info(_app_info),
+    CHECK_EQ_PREFIX_MSG(store_app_info(),
                         ERR_OK,
                         "store_app_info for max_replica_count failed: 
app_name={}, "
                         "app_id={}, old_max_replica_count={}, 
new_max_replica_count={}",
@@ -1152,6 +1154,27 @@ void replica::update_app_max_replica_count(int32_t 
max_replica_count)
                         _app_info.max_replica_count);
 }
 
+void replica::update_app_atomic_idempotent(bool atomic_idempotent)
+{
+    // No need to check `_app_info.__isset.atomic_idempotent`, since by 
default it is true
+    // (because `_app_info.atomic_idempotent` has default value false).
+    if (atomic_idempotent == _app_info.atomic_idempotent) {
+        return;
+    }
+
+    const auto old_atomic_idempotent = _app_info.atomic_idempotent;
+    _app_info.atomic_idempotent = atomic_idempotent;
+
+    CHECK_EQ_PREFIX_MSG(store_app_info(),
+                        ERR_OK,
+                        "store_app_info for atomic_idempotent failed: 
app_name={}, "
+                        "app_id={}, old_atomic_idempotent={}, 
new_atomic_idempotent={}",
+                        _app_info.app_name,
+                        _app_info.app_id,
+                        old_atomic_idempotent,
+                        _app_info.atomic_idempotent);
+}
+
 void replica::replay_prepare_list()
 {
     decree start = last_committed_decree() + 1;
@@ -1190,10 +1213,10 @@ void replica::update_app_duplication_status(bool 
duplicating)
         return;
     }
 
-    auto old_duplicating = _app_info.duplicating;
+    const auto old_duplicating = _app_info.duplicating;
     _app_info.__set_duplicating(duplicating);
 
-    CHECK_EQ_PREFIX_MSG(store_app_info(_app_info),
+    CHECK_EQ_PREFIX_MSG(store_app_info(),
                         ERR_OK,
                         "store_app_info for duplicating failed: app_name={}, "
                         "app_id={}, old_duplicating={}, new_duplicating={}",
diff --git a/src/replica/replica_disk_migrator.cpp 
b/src/replica/replica_disk_migrator.cpp
index 597644103..12f5e0425 100644
--- a/src/replica/replica_disk_migrator.cpp
+++ b/src/replica/replica_disk_migrator.cpp
@@ -263,9 +263,7 @@ bool replica_disk_migrator::migrate_replica_app_info(const 
replica_disk_migrate_
         return false;
     }
 
-    const auto &store_info_err = _replica->store_app_info(
-        _replica->_app_info,
-        utils::filesystem::path_combine(_target_replica_dir, 
replica_app_info::kAppInfo));
+    const auto &store_info_err = _replica->store_app_info(_target_replica_dir);
     if (store_info_err != ERR_OK) {
         LOG_ERROR_PREFIX("disk migration(origin={}, target={}) stores app info 
failed({})",
                          req.origin_disk,
diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp
index 1bee60eda..54b65b166 100644
--- a/src/replica/replica_init.cpp
+++ b/src/replica/replica_init.cpp
@@ -50,6 +50,7 @@
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
+#include "utils/ports.h"
 #include "utils/uniq_timestamp_us.h"
 
 DSN_DEFINE_bool(replication,
@@ -84,8 +85,8 @@ error_code replica::initialize_on_new()
         return ERR_FILE_OPERATION_FAILED;
     }
 
-    auto err = store_app_info(_app_info);
-    if (err != ERR_OK) {
+    const auto err = store_app_info();
+    if (dsn_unlikely(err != ERR_OK)) {
         dsn::utils::filesystem::remove_path(_dir);
         return err;
     }
diff --git a/src/replica/split/replica_split_manager.cpp 
b/src/replica/split/replica_split_manager.cpp
index 56f4c00e2..4afd5df97 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -871,7 +871,7 @@ void replica_split_manager::update_local_partition_count(
     if (info.init_partition_count < 0) {
         info.init_partition_count = info.partition_count;
     }
-    auto old_partition_count = info.partition_count;
+    const auto old_partition_count = info.partition_count;
     info.partition_count = new_partition_count;
 
     CHECK_EQ_PREFIX_MSG(_replica->store_app_info(info), ERR_OK, "failed to 
save app_info");
diff --git a/src/replica/test/replica_test.cpp 
b/src/replica/test/replica_test.cpp
index 616449b2a..d1fd9ab04 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -48,7 +48,6 @@
 #include "replica/replica.h"
 #include "replica/replica_http_service.h"
 #include "replica/replica_stub.h"
-#include "replica/replication_app_base.h"
 #include "replica/test/mock_utils.h"
 #include "replica_test_base.h"
 #include "rpc/network.sim.h"
@@ -102,15 +101,26 @@ public:
         // FLAGS_cold_backup_root is set by configuration 
"replication.cold_backup_root",
         // which is usually the cluster_name of production clusters.
         FLAGS_cold_backup_root = "test_cluster";
+
+        // Initially update the whole .app-info file with `_app_info` as the 
defined format
+        // (encrypted or unencrypted).
+        ASSERT_EQ(ERR_OK, _mock_replica->store_app_info());
     }
 
-    int64_t get_backup_request_count() const { return 
_mock_replica->get_backup_request_count(); }
+    [[nodiscard]] int64_t get_backup_request_count() const
+    {
+        return _mock_replica->get_backup_request_count();
+    }
 
-    bool get_validate_partition_hash() const { return 
_mock_replica->_validate_partition_hash; }
+    [[nodiscard]] bool get_validate_partition_hash() const
+    {
+        return _mock_replica->_validate_partition_hash;
+    }
 
-    void reset_validate_partition_hash() { 
_mock_replica->_validate_partition_hash = false; }
+    void reset_validate_partition_hash() const { 
_mock_replica->_validate_partition_hash = false; }
 
-    void update_validate_partition_hash(bool old_value, bool set_in_map, 
std::string new_value)
+    void
+    update_validate_partition_hash(bool old_value, bool set_in_map, 
std::string new_value) const
     {
         _mock_replica->_validate_partition_hash = old_value;
         std::map<std::string, std::string> envs;
@@ -240,43 +250,51 @@ public:
         return false;
     }
 
-    void test_update_app_max_replica_count()
+    // Load `info` from .app-info file to test whether it is consistent with 
the one
+    // in memory.
+    void load_app_info(app_info &info) const
+    {
+        std::cout << ".app-info file is under " << _mock_replica->_dir << 
std::endl;
+        ASSERT_EQ(ERR_OK, _mock_replica->load_app_info(info));
+
+        std::cout << "The loaded app_info is " << info << std::endl;
+        ASSERT_EQ(_mock_replica->_app_info, info);
+    }
+
+    void load_app_max_replica_count(int32_t expected_max_replica_count) const
+    {
+        app_info info;
+        load_app_info(info);
+        ASSERT_EQ(expected_max_replica_count, info.max_replica_count);
+    }
+
+    // Test `max_replica_count` both on disk and in memory after updated.
+    void update_app_max_replica_count(int32_t expected_max_replica_count)
+    {
+        
_mock_replica->update_app_max_replica_count(expected_max_replica_count);
+        _app_info.max_replica_count = expected_max_replica_count;
+
+        load_app_max_replica_count(expected_max_replica_count);
+    }
+
+    void load_app_atomic_idempotent(bool expected_atomic_idempotent) const
     {
-        const auto reserved_max_replica_count = _app_info.max_replica_count;
-        const int32_t target_max_replica_count = 5;
-        CHECK_NE(target_max_replica_count, reserved_max_replica_count);
-
-        // store new max_replica_count into file
-        _mock_replica->update_app_max_replica_count(target_max_replica_count);
-        _app_info.max_replica_count = target_max_replica_count;
-
-        dsn::app_info info;
-        replica_app_info replica_info(&info);
-
-        auto path = dsn::utils::filesystem::path_combine(
-            _mock_replica->_dir, dsn::replication::replica_app_info::kAppInfo);
-        std::cout << "the path of .app-info file is " << path << std::endl;
-
-        // load new max_replica_count from file
-        auto err = replica_info.load(path);
-        ASSERT_EQ(ERR_OK, err);
-        ASSERT_EQ(info, _mock_replica->_app_info);
-        std::cout << "the loaded new app_info is " << info << std::endl;
-
-        // recover original max_replica_count
-        
_mock_replica->update_app_max_replica_count(reserved_max_replica_count);
-        _app_info.max_replica_count = reserved_max_replica_count;
-
-        // load original max_replica_count from file
-        err = replica_info.load(path);
-        ASSERT_EQ(err, ERR_OK);
-        ASSERT_EQ(info, _mock_replica->_app_info);
-        std::cout << "the loaded original app_info is " << info << std::endl;
+        app_info info;
+        load_app_info(info);
+        ASSERT_EQ(expected_atomic_idempotent, info.atomic_idempotent);
+    }
+
+    // Test `atomic_idempotent` both on disk and in memory after updated.
+    void update_app_atomic_idempotent(bool expected_atomic_idempotent)
+    {
+        
_mock_replica->update_app_atomic_idempotent(expected_atomic_idempotent);
+        _app_info.atomic_idempotent = expected_atomic_idempotent;
+
+        load_app_atomic_idempotent(expected_atomic_idempotent);
     }
 
     void test_auto_trash(error_code ec);
 
-public:
     dsn::app_info _app_info;
     dsn::gpid _pid;
     mock_replica_ptr _mock_replica;
@@ -628,7 +646,35 @@ TEST_P(replica_test, update_deny_client_test)
     }
 }
 
-TEST_P(replica_test, test_update_app_max_replica_count) { 
test_update_app_max_replica_count(); }
+TEST_P(replica_test, test_update_app_max_replica_count)
+{
+    const auto original_max_replica_count = _app_info.max_replica_count;
+    const int32_t target_max_replica_count = 5;
+
+    // The new value should not be equal to the original one.
+    CHECK_NE(target_max_replica_count, original_max_replica_count);
+
+    // Test the original value of `max_replica_count`.
+    load_app_max_replica_count(original_max_replica_count);
+
+    // Test `max_replica_count` after updated to the new value.
+    update_app_max_replica_count(target_max_replica_count);
+
+    // Test `max_replica_count` after recovered to the original value.
+    update_app_max_replica_count(original_max_replica_count);
+}
+
+TEST_P(replica_test, test_update_app_atomic_idempotent)
+{
+    // Test the default value of `atomic_idempotent` which should be false.
+    load_app_atomic_idempotent(false);
+
+    // Test `atomic_idempotent` after updated to true.
+    update_app_atomic_idempotent(true);
+
+    // Test `atomic_idempotent` after recovered to the default value.
+    update_app_atomic_idempotent(false);
+}
 
 } // namespace replication
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to