This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d9f2600ec feat(duplication): collect last committed decrees from 
primary replicas to meta server of the master cluster for duplication (#2159)
d9f2600ec is described below

commit d9f2600ec9cb6812d567ffb89539e5d03467f3e6
Author: Dan Wang <[email protected]>
AuthorDate: Mon Dec 2 18:35:04 2024 +0800

    feat(duplication): collect last committed decrees from primary replicas to 
meta server of the master cluster for duplication (#2159)
    
    While a table is being migrated to another cluster by duplication, we want 
to check
    if the migration is finished by decree. Since we already have confirmed 
decree for
    the log entires duplicated to the follower table, we need to collect last 
committed
    decrees from the primary replicas to the meta server of the master cluster. 
We would
    compare both kinds of decrees to check whether the migration is finished.
    
    Following configurations are newly added:
    ```diff
    [replication]
    + dup_progress_min_update_period_ms = 5000
    + dup_progress_min_report_period_ms = 300000
    ```
---
 idl/duplication.thrift                             |   4 +
 src/meta/duplication/duplication_info.cpp          |  80 ++++++++++++----
 src/meta/duplication/duplication_info.h            |   8 +-
 src/meta/duplication/meta_duplication_service.cpp  |  38 ++++----
 src/meta/meta_state_service_utils.h                |  20 +++-
 src/meta/server_state.cpp                          |   9 +-
 src/meta/test/duplication_info_test.cpp            | 103 ++++++++++++++++-----
 src/meta/test/meta_duplication_service_test.cpp    |  29 +++---
 .../duplication/replica_duplicator_manager.cpp     |   1 +
 9 files changed, 214 insertions(+), 78 deletions(-)

diff --git a/idl/duplication.thrift b/idl/duplication.thrift
index 140361ca9..589cc5d08 100644
--- a/idl/duplication.thrift
+++ b/idl/duplication.thrift
@@ -172,6 +172,10 @@ struct duplication_confirm_entry
     1:i32       dupid;
     2:i64       confirmed_decree;
     3:optional bool checkpoint_prepared = false;
+
+    // Last committed decree from the primary replica of each partition, 
collected by
+    // meta server and used to be compared with duplicating progress of 
follower table.
+    4:optional i64 last_committed_decree;
 }
 
 // This is an internal RPC sent from replica server to meta.
diff --git a/src/meta/duplication/duplication_info.cpp 
b/src/meta/duplication/duplication_info.cpp
index 7f7bb62b6..8f609c7b2 100644
--- a/src/meta/duplication/duplication_info.cpp
+++ b/src/meta/duplication/duplication_info.cpp
@@ -20,10 +20,20 @@
 #include "common/duplication_common.h"
 #include "meta/meta_data.h"
 #include "runtime/api_layer1.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
 
-namespace dsn {
-namespace replication {
+DSN_DEFINE_uint64(replication,
+                  dup_progress_min_update_period_ms,
+                  5000,
+                  "The minimum period in milliseconds that progress of 
duplication is updated");
+
+DSN_DEFINE_uint64(replication,
+                  dup_progress_min_report_period_ms,
+                  5ULL * 60 * 1000,
+                  "The minimum period in milliseconds that progress of 
duplication is reported");
+
+namespace dsn::replication {
 
 /*extern*/ void json_encode(dsn::json::JsonWriter &out, const 
duplication_status::type &s)
 {
@@ -116,8 +126,13 @@ void duplication_info::init_progress(int partition_index, 
decree d)
     zauto_write_lock l(_lock);
 
     auto &p = _progress[partition_index];
+
+    p.last_committed_decree = invalid_decree;
     p.volatile_decree = p.stored_decree = d;
+    p.is_altering = false;
+    p.last_progress_update_ms = 0;
     p.is_inited = true;
+    p.checkpoint_prepared = false;
 }
 
 bool duplication_info::alter_progress(int partition_index,
@@ -126,9 +141,18 @@ bool duplication_info::alter_progress(int partition_index,
     zauto_write_lock l(_lock);
 
     partition_progress &p = _progress[partition_index];
+
+    // last_committed_decree could be update at any time no matter whether 
progress is
+    // initialized or busy updating, since it is not persisted to remote meta 
storage.
+    // It is just collected from the primary replica of each partition.
+    if (confirm_entry.__isset.last_committed_decree) {
+        p.last_committed_decree = confirm_entry.last_committed_decree;
+    }
+
     if (!p.is_inited) {
         return false;
     }
+
     if (p.is_altering) {
         return false;
     }
@@ -137,15 +161,19 @@ bool duplication_info::alter_progress(int partition_index,
     if (p.volatile_decree < confirm_entry.confirmed_decree) {
         p.volatile_decree = confirm_entry.confirmed_decree;
     }
-    if (p.volatile_decree != p.stored_decree) {
-        // progress update is not supposed to be too frequent.
-        if (dsn_now_ms() > p.last_progress_update_ms + 
PROGRESS_UPDATE_PERIOD_MS) {
-            p.is_altering = true;
-            p.last_progress_update_ms = dsn_now_ms();
-            return true;
-        }
+
+    if (p.volatile_decree == p.stored_decree) {
+        return false;
+    }
+
+    // Progress update is not supposed to be too frequent.
+    if (dsn_now_ms() < p.last_progress_update_ms + 
FLAGS_dup_progress_min_update_period_ms) {
+        return false;
     }
-    return false;
+
+    p.is_altering = true;
+    p.last_progress_update_ms = dsn_now_ms();
+    return true;
 }
 
 void duplication_info::persist_progress(int partition_index)
@@ -163,13 +191,26 @@ void duplication_info::persist_status()
     zauto_write_lock l(_lock);
 
     if (!_is_altering) {
-        LOG_ERROR_PREFIX("callers never write a duplication that is not 
altering to meta store");
+        LOG_ERROR_PREFIX("the status of this duplication is not being altered: 
status={}, "
+                         "next_status={}, master_app_id={}, 
master_app_name={}, "
+                         "follower_cluster_name={}, follower_app_name={}",
+                         duplication_status_to_string(_status),
+                         duplication_status_to_string(_next_status),
+                         app_id,
+                         app_name,
+                         remote_cluster_name,
+                         remote_app_name);
         return;
     }
-    LOG_INFO_PREFIX("change duplication status from {} to {} successfully 
[app_id: {}]",
+
+    LOG_INFO_PREFIX("change duplication status from {} to {} successfully: 
master_app_id={}, "
+                    "master_app_name={}, follower_cluster_name={}, 
follower_app_name={}",
                     duplication_status_to_string(_status),
                     duplication_status_to_string(_next_status),
-                    app_id);
+                    app_id,
+                    app_name,
+                    remote_cluster_name,
+                    remote_app_name);
 
     _is_altering = false;
     _status = _next_status;
@@ -197,11 +238,13 @@ blob duplication_info::to_json_blob() const
 
 void duplication_info::report_progress_if_time_up()
 {
-    // progress report is not supposed to be too frequent.
-    if (dsn_now_ms() > _last_progress_report_ms + PROGRESS_REPORT_PERIOD_MS) {
-        _last_progress_report_ms = dsn_now_ms();
-        LOG_INFO("duplication report: {}", to_string());
+    // Progress report is not supposed to be too frequent.
+    if (dsn_now_ms() < _last_progress_report_ms + 
FLAGS_dup_progress_min_report_period_ms) {
+        return;
     }
+
+    _last_progress_report_ms = dsn_now_ms();
+    LOG_INFO("duplication report: {}", to_string());
 }
 
 duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id,
@@ -263,5 +306,4 @@ void duplication_info::append_if_valid_for_query(
     ent.__isset.progress = false;
 }
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/meta/duplication/duplication_info.h 
b/src/meta/duplication/duplication_info.h
index e1ddcacf3..7563d3d41 100644
--- a/src/meta/duplication/duplication_info.h
+++ b/src/meta/duplication/duplication_info.h
@@ -216,13 +216,15 @@ private:
 
     mutable zrwlock_nr _lock;
 
-    static constexpr int PROGRESS_UPDATE_PERIOD_MS = 5000;          // 5s
-    static constexpr int PROGRESS_REPORT_PERIOD_MS = 1000 * 60 * 5; // 5min
-
     struct partition_progress
     {
+        // Last committed decree collected from the primary replica of each 
partition.
+        // Not persisted to remote meta storage.
+        int64_t last_committed_decree{invalid_decree};
+
         int64_t volatile_decree{invalid_decree};
         int64_t stored_decree{invalid_decree};
+
         bool is_altering{false};
         uint64_t last_progress_update_ms{0};
         bool is_inited{false};
diff --git a/src/meta/duplication/meta_duplication_service.cpp 
b/src/meta/duplication/meta_duplication_service.cpp
index c557c890a..f63d8af35 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -791,31 +791,36 @@ void 
meta_duplication_service::do_update_partition_confirmed(
     int32_t partition_idx,
     const duplication_confirm_entry &confirm_entry)
 {
-    if (dup->alter_progress(partition_idx, confirm_entry)) {
-        std::string path = get_partition_path(dup, 
std::to_string(partition_idx));
-        blob value = 
blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));
+    if (!dup->alter_progress(partition_idx, confirm_entry)) {
+        return;
+    }
+
+    const auto &path = get_partition_path(dup, std::to_string(partition_idx));
+
+    _meta_svc->get_meta_storage()->get_data(
+        path, [dup, rpc, partition_idx, confirm_entry, path, this](const blob 
&data) mutable {
+            auto value = 
blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree));
 
-        _meta_svc->get_meta_storage()->get_data(std::string(path), [=](const 
blob &data) mutable {
-            if (data.length() == 0) {
+            if (data.empty()) {
                 _meta_svc->get_meta_storage()->create_node(
-                    std::string(path), std::move(value), [=]() mutable {
-                        dup->persist_progress(partition_idx);
-                        
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
-                            confirm_entry.confirmed_decree;
-                    });
-            } else {
-                _meta_svc->get_meta_storage()->set_data(
-                    std::string(path), std::move(value), [=]() mutable {
+                    path, std::move(value), [dup, rpc, partition_idx, 
confirm_entry]() mutable {
                         dup->persist_progress(partition_idx);
                         
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
                             confirm_entry.confirmed_decree;
                     });
+                return;
             }
 
+            _meta_svc->get_meta_storage()->set_data(
+                path, std::move(value), [dup, rpc, partition_idx, 
confirm_entry]() mutable {
+                    dup->persist_progress(partition_idx);
+                    
rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] =
+                        confirm_entry.confirmed_decree;
+                });
+
             // duplication_sync_rpc will finally be replied when confirmed 
points
             // of all partitions are stored.
         });
-    }
 }
 
 std::shared_ptr<duplication_info>
@@ -908,7 +913,7 @@ void 
meta_duplication_service::do_restore_duplication_progress(
             std::move(partition_path), [dup, partition_idx](const blob &value) 
{
                 // value is confirmed_decree encoded in string.
 
-                if (value.size() == 0) {
+                if (value.empty()) {
                     // not found
                     dup->init_progress(partition_idx, invalid_decree);
                     return;
@@ -953,10 +958,11 @@ void 
meta_duplication_service::do_restore_duplication(dupid_t dup_id,
                                                           
app->max_replica_count,
                                                           store_path,
                                                           json);
-            if (nullptr == dup) {
+            if (!dup) {
                 LOG_ERROR("failed to decode json \"{}\" on path {}", json, 
store_path);
                 return; // fail fast
             }
+
             if (!dup->is_invalid_status()) {
                 app->duplications[dup->id] = dup;
                 refresh_duplicating_no_lock(app);
diff --git a/src/meta/meta_state_service_utils.h 
b/src/meta/meta_state_service_utils.h
index 41099e898..04b8086df 100644
--- a/src/meta/meta_state_service_utils.h
+++ b/src/meta/meta_state_service_utils.h
@@ -20,11 +20,14 @@
 #include <functional>
 #include <queue>
 #include <string>
+#include <utility>
 #include <vector>
 
+#include "utils/blob.h"
+
 namespace dsn {
-class blob;
 class task_tracker;
+
 namespace dist {
 class meta_state_service;
 } // namespace dist
@@ -57,6 +60,11 @@ struct meta_storage
 
     void create_node(std::string &&node, blob &&value, std::function<void()> 
&&cb);
 
+    void create_node(const std::string &node, blob &&value, 
std::function<void()> &&cb)
+    {
+        create_node(std::string(node), std::move(value), std::move(cb));
+    }
+
     void delete_node_recursively(std::string &&node, std::function<void()> 
&&cb);
 
     void delete_node(std::string &&node, std::function<void()> &&cb);
@@ -64,9 +72,19 @@ struct meta_storage
     /// Will fatal if node doesn't exists.
     void set_data(std::string &&node, blob &&value, std::function<void()> 
&&cb);
 
+    void set_data(const std::string &node, blob &&value, std::function<void()> 
&&cb)
+    {
+        set_data(std::string(node), std::move(value), std::move(cb));
+    }
+
     /// If node does not exist, cb will receive an empty blob.
     void get_data(std::string &&node, std::function<void(const blob &)> &&cb);
 
+    void get_data(const std::string &node, std::function<void(const blob &)> 
&&cb)
+    {
+        get_data(std::string(node), std::move(cb));
+    }
+
     /// \param cb: void (bool node_exists, const std::vector<std::string> 
&children)
     ///            `children` contains the name (not full path) of children 
nodes.
     ///            `node_exists` indicates whether this node exists.
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index fe1820688..8e68113f7 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -1128,9 +1128,10 @@ void server_state::create_app(dsn::message_ex *msg)
              request.options.partition_count,
              request.options.replica_count,
              duplicating
-                 ? fmt::format("{}.{}",
-                               
request.options.envs[duplication_constants::kEnvMasterClusterKey],
-                               request.app_name)
+                 ? fmt::format("master_cluster_name={}, master_app_name={}",
+                               master_cluster->second,
+                               gutil::FindWithDefault(request.options.envs,
+                                                      
duplication_constants::kEnvMasterAppNameKey))
                  : "false");
 
     auto option_match_check = [](const create_app_options &opt, const 
app_state &exist_app) {
@@ -1162,7 +1163,7 @@ void server_state::create_app(dsn::message_ex *msg)
     zauto_write_lock l(_lock);
 
     auto app = get_app(request.app_name);
-    if (nullptr != app) {
+    if (app) {
         configuration_create_app_response response;
 
         switch (app->status) {
diff --git a/src/meta/test/duplication_info_test.cpp 
b/src/meta/test/duplication_info_test.cpp
index 5cfa4e12b..72bbcdbb1 100644
--- a/src/meta/test/duplication_info_test.cpp
+++ b/src/meta/test/duplication_info_test.cpp
@@ -30,9 +30,12 @@
 
 #include "gtest/gtest.h"
 #include "runtime/app_model.h"
+#include "test_util/test_util.h"
+#include "utils/flags.h"
 
-namespace dsn {
-namespace replication {
+DSN_DECLARE_uint64(dup_progress_min_update_period_ms);
+
+namespace dsn::replication {
 
 class duplication_info_test : public testing::Test
 {
@@ -48,9 +51,22 @@ public:
         dup._status = status;
     }
 
-    static void test_alter_progress()
+    static void test_init_progress(duplication_info &dup, int partition_idx, 
decree expected_decree)
     {
+        dup.init_progress(partition_idx, expected_decree);
+
+        const auto &progress = dup._progress[partition_idx];
+        ASSERT_EQ(invalid_decree, progress.last_committed_decree);
+        ASSERT_EQ(expected_decree, progress.volatile_decree);
+        ASSERT_EQ(expected_decree, progress.stored_decree);
+        ASSERT_FALSE(progress.is_altering);
+        ASSERT_EQ(0, progress.last_progress_update_ms);
+        ASSERT_TRUE(progress.is_inited);
+        ASSERT_FALSE(progress.checkpoint_prepared);
+    }
 
+    static void test_alter_progress()
+    {
         duplication_info dup(1,
                              1,
                              kTestAppName,
@@ -61,46 +77,91 @@ public:
                              kTestRemoteAppName,
                              std::vector<host_port>(),
                              kTestMetaStorePath);
-        duplication_confirm_entry entry;
-        ASSERT_FALSE(dup.alter_progress(0, entry));
 
-        dup.init_progress(0, invalid_decree);
+        // Failed to alter progres for partition 0 since it has not been 
initialized.
+        ASSERT_FALSE(dup.alter_progress(0, duplication_confirm_entry()));
+
+        // Initialize progress for partition 0.
+        test_init_progress(dup, 0, invalid_decree);
+
+        // Alter progress with specified decrees for partition 0.
+        duplication_confirm_entry entry;
+        entry.__set_last_committed_decree(8);
         entry.confirmed_decree = 5;
         entry.checkpoint_prepared = true;
         ASSERT_TRUE(dup.alter_progress(0, entry));
-        ASSERT_EQ(dup._progress[0].volatile_decree, 5);
+
+        ASSERT_EQ(8, dup._progress[0].last_committed_decree);
+        ASSERT_EQ(5, dup._progress[0].volatile_decree);
+        ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
         ASSERT_TRUE(dup._progress[0].is_altering);
         ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
 
-        // busy updating
+        // Busy updating.
+        entry.__set_last_committed_decree(15);
         entry.confirmed_decree = 10;
         entry.checkpoint_prepared = false;
         ASSERT_FALSE(dup.alter_progress(0, entry));
-        ASSERT_EQ(dup._progress[0].volatile_decree, 5);
+
+        // last_committed_decree could be updated at any time.
+        ASSERT_EQ(15, dup._progress[0].last_committed_decree);
+        ASSERT_EQ(5, dup._progress[0].volatile_decree);
+        ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
         ASSERT_TRUE(dup._progress[0].is_altering);
         ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
 
+        // Persist progress for partition 0.
         dup.persist_progress(0);
-        ASSERT_EQ(dup._progress[0].stored_decree, 5);
+
+        ASSERT_EQ(15, dup._progress[0].last_committed_decree);
+        ASSERT_EQ(5, dup._progress[0].volatile_decree);
+        ASSERT_EQ(5, dup._progress[0].stored_decree);
         ASSERT_FALSE(dup._progress[0].is_altering);
         ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
 
-        // too frequent to update
-        dup.init_progress(1, invalid_decree);
+        // Initialize progress for partition 1.
+        test_init_progress(dup, 1, 5);
+
+        // Alter progress for partition 1.
         ASSERT_TRUE(dup.alter_progress(1, entry));
+
+        ASSERT_EQ(15, dup._progress[1].last_committed_decree);
+        ASSERT_EQ(10, dup._progress[1].volatile_decree);
+        ASSERT_EQ(5, dup._progress[1].stored_decree);
         ASSERT_TRUE(dup._progress[1].is_altering);
+        ASSERT_FALSE(dup._progress[1].checkpoint_prepared);
+
+        // Persist progress for partition 1.
         dup.persist_progress(1);
 
+        // It is too frequent to alter progress.
+        PRESERVE_FLAG(dup_progress_min_update_period_ms);
+        FLAGS_dup_progress_min_update_period_ms = 10000;
+        entry.__set_last_committed_decree(25);
+        entry.confirmed_decree = 15;
+        entry.checkpoint_prepared = true;
         ASSERT_FALSE(dup.alter_progress(1, entry));
+        ASSERT_EQ(25, dup._progress[1].last_committed_decree);
+        // volatile_decree would be updated successfully even if it is too 
frequent.
+        ASSERT_EQ(15, dup._progress[1].volatile_decree);
+        ASSERT_EQ(10, dup._progress[1].stored_decree);
         ASSERT_FALSE(dup._progress[1].is_altering);
+        // checkpoint_prepared would be updated successfully even if it is too 
frequent.
+        ASSERT_TRUE(dup._progress[1].checkpoint_prepared);
 
-        dup._progress[1].last_progress_update_ms -=
-            duplication_info::PROGRESS_UPDATE_PERIOD_MS + 100;
+        // Reduce last update timestamp to make it infrequent.
+        dup._progress[1].last_progress_update_ms -= 
FLAGS_dup_progress_min_update_period_ms + 100;
+        entry.__set_last_committed_decree(26);
+        entry.confirmed_decree = 25;
 
-        entry.confirmed_decree = 15;
-        entry.checkpoint_prepared = true;
         ASSERT_TRUE(dup.alter_progress(1, entry));
+        ASSERT_EQ(26, dup._progress[1].last_committed_decree);
+        ASSERT_EQ(25, dup._progress[1].volatile_decree);
+        ASSERT_EQ(10, dup._progress[1].stored_decree);
         ASSERT_TRUE(dup._progress[1].is_altering);
+        ASSERT_TRUE(dup._progress[1].checkpoint_prepared);
+
+        // Checkpoint are ready for both partition 0 and 1.
         ASSERT_TRUE(dup.all_checkpoint_has_prepared());
     }
 
@@ -128,8 +189,9 @@ public:
         for (int i = 0; i < 4; i++) {
             dup.init_progress(i, invalid_decree);
         }
+
         for (auto kv : dup_ent.progress) {
-            ASSERT_EQ(kv.second, invalid_decree);
+            ASSERT_EQ(invalid_decree, kv.second);
         }
 
         dup.start();
@@ -153,8 +215,8 @@ public:
         dup.start();
 
         dup.persist_status();
-        ASSERT_EQ(dup._status, duplication_status::DS_PREPARE);
-        ASSERT_EQ(dup._next_status, duplication_status::DS_INIT);
+        ASSERT_EQ(duplication_status::DS_PREPARE, dup._status);
+        ASSERT_EQ(duplication_status::DS_INIT, dup._next_status);
         ASSERT_FALSE(dup.is_altering());
     }
 
@@ -358,5 +420,4 @@ TEST_F(duplication_info_test, is_valid)
     ASSERT_TRUE(dup.is_invalid_status());
 }
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
diff --git a/src/meta/test/meta_duplication_service_test.cpp 
b/src/meta/test/meta_duplication_service_test.cpp
index a8a6668f9..ec65a41fa 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -677,11 +677,11 @@ TEST_F(meta_duplication_service_test, remove_dup)
 TEST_F(meta_duplication_service_test, duplication_sync)
 {
     const auto &server_nodes = ensure_enough_alive_nodes(3);
-    const std::string test_app = "test_app_0";
+    const std::string test_app("test_app_0");
     create_app(test_app);
     auto app = find_app(test_app);
 
-    // generate all primaries on node[0]
+    // Generate all primaries on node[0].
     for (auto &pc : app->pcs) {
         pc.ballot = random32(1, 10000);
         SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, server_nodes[0]);
@@ -696,6 +696,7 @@ TEST_F(meta_duplication_service_test, duplication_sync)
     for (int i = 0; i < app->partition_count; i++) {
         dup->init_progress(i, invalid_decree);
     }
+
     {
         std::map<gpid, std::vector<duplication_confirm_entry>> confirm_list;
 
@@ -712,20 +713,20 @@ TEST_F(meta_duplication_service_test, duplication_sync)
         confirm_list[gpid(app->app_id, 3)].push_back(ce);
 
         duplication_sync_response resp = duplication_sync(node, confirm_list);
-        ASSERT_EQ(resp.err, ERR_OK);
-        ASSERT_EQ(resp.dup_map.size(), 1);
-        ASSERT_EQ(resp.dup_map[app->app_id].size(), 1);
-        ASSERT_EQ(resp.dup_map[app->app_id][dupid].dupid, dupid);
-        ASSERT_EQ(resp.dup_map[app->app_id][dupid].status, 
duplication_status::DS_PREPARE);
-        ASSERT_EQ(resp.dup_map[app->app_id][dupid].create_ts, 
dup->create_timestamp_ms);
-        ASSERT_EQ(resp.dup_map[app->app_id][dupid].remote, 
dup->remote_cluster_name);
-        ASSERT_EQ(resp.dup_map[app->app_id][dupid].fail_mode, 
dup->fail_mode());
+        ASSERT_EQ(ERR_OK, resp.err);
+        ASSERT_EQ(1, resp.dup_map.size());
+        ASSERT_EQ(1, resp.dup_map[app->app_id].size());
+        ASSERT_EQ(dupid, resp.dup_map[app->app_id][dupid].dupid);
+        ASSERT_EQ(duplication_status::DS_PREPARE, 
resp.dup_map[app->app_id][dupid].status);
+        ASSERT_EQ(dup->create_timestamp_ms, 
resp.dup_map[app->app_id][dupid].create_ts);
+        ASSERT_EQ(dup->remote_cluster_name, 
resp.dup_map[app->app_id][dupid].remote);
+        ASSERT_EQ(dup->fail_mode(), 
resp.dup_map[app->app_id][dupid].fail_mode);
 
         auto progress_map = resp.dup_map[app->app_id][dupid].progress;
-        ASSERT_EQ(progress_map.size(), 8);
-        ASSERT_EQ(progress_map[1], 5);
-        ASSERT_EQ(progress_map[2], 6);
-        ASSERT_EQ(progress_map[3], 7);
+        ASSERT_EQ(8, progress_map.size());
+        ASSERT_EQ(5, progress_map[1]);
+        ASSERT_EQ(6, progress_map[2]);
+        ASSERT_EQ(7, progress_map[3]);
 
         // ensure no updated progresses will also be included in response
         for (int p = 4; p < 8; p++) {
diff --git a/src/replica/duplication/replica_duplicator_manager.cpp 
b/src/replica/duplication/replica_duplicator_manager.cpp
index 2e1e61cc4..724f164d1 100644
--- a/src/replica/duplication/replica_duplicator_manager.cpp
+++ b/src/replica/duplication/replica_duplicator_manager.cpp
@@ -94,6 +94,7 @@ 
replica_duplicator_manager::get_duplication_confirms_to_update() const
         entry.dupid = dup->id();
         entry.confirmed_decree = progress.last_decree;
         entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared);
+        entry.__set_last_committed_decree(_replica->last_committed_decree());
         updates.emplace_back(entry);
     }
     return updates;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to