This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c49e41b8 feat(duplication): support duplication entry for multiple 
purposes (#2163)
7c49e41b8 is described below

commit 7c49e41b8d3ca9d41701864049cd34f432a70438
Author: Dan Wang <[email protected]>
AuthorDate: Wed Dec 4 12:03:59 2024 +0800

    feat(duplication): support duplication entry for multiple purposes (#2163)
    
    `duplication_entry` is an important struct for duplication, including 
per-duplication
    properties and progress info(confirmed decree) for each partition of a 
table. It
    contains all useful info for a duplication. Therefore, it has been used in 
response to
    query and sync:
    
    - The query is requested by client, to the meta server to get the 
properties of
    duplications of single table;
    - The sync is requested by replica server, also to the meta server to sync 
progress.
    
    Soon we would support a new interface to get duplications from multiple 
tables,
    i.e. `list duplications`. It would request duplications of multiple tables, 
also including
    per-duplication properties and progress info(confirmed decree and a new 
field last
    committed decree). It still uses `duplication_entry` as the response to the 
client.
    Thus we add fields to `duplication_entry` to support more purposes.
---
 idl/duplication.thrift                             | 29 ++++++++-
 src/common/duplication_common.cpp                  | 23 +++++--
 src/meta/duplication/duplication_info.cpp          | 13 +---
 src/meta/duplication/duplication_info.h            | 57 +++++++++++++----
 src/meta/duplication/meta_duplication_service.cpp  |  4 +-
 src/meta/test/duplication_info_test.cpp            | 71 +++++++++++++++++++---
 src/meta/test/meta_duplication_service_test.cpp    | 10 ++-
 .../duplication/replica_duplicator_manager.cpp     | 24 ++++----
 8 files changed, 177 insertions(+), 54 deletions(-)

diff --git a/idl/duplication.thrift b/idl/duplication.thrift
index 589cc5d08..604edd1d3 100644
--- a/idl/duplication.thrift
+++ b/idl/duplication.thrift
@@ -22,6 +22,15 @@ namespace cpp dsn.replication
 namespace go admin
 namespace java org.apache.pegasus.replication
 
+// Indicate which data of a table needs to be duplicated:
+// * FULL: all of the data of the table needs to be duplicated.
+// * INCREMENTAL: only incremental data of the table would be duplicated.
+enum duplication_mode
+{
+    FULL = 0,
+    INCREMENTAL,
+}
+
 //  - INIT  -> PREPARE
 //  - PREPARE -> APP
 //  - APP -> LOG
@@ -129,6 +138,16 @@ struct duplication_modify_response
     2:i32              appid;
 }
 
+// The states tracking each partition for duplication.
+struct duplication_partition_state
+{
+    // The max decree of this partition that has been confirmed to be received 
by follower.
+    1:i64 confirmed_decree;
+
+    // The max decree that has been committed by this partition.
+    2:i64 last_committed_decree;
+}
+
 struct duplication_entry
 {
     1:i32                  dupid;
@@ -136,7 +155,8 @@ struct duplication_entry
     3:string               remote;
     4:i64                  create_ts;
 
-    // partition_index => confirmed decree
+    // Used for syncing duplications with partition-level progress (replica 
server -> meta server).
+    // partition index => confirmed decree.
     5:optional map<i32, i64> progress;
 
     7:optional duplication_fail_mode fail_mode;
@@ -150,6 +170,13 @@ struct duplication_entry
     // For versions >= v2.6.0, this could be specified by client.
     // For versions < v2.6.0, this must be the same with source replica_count.
     9:optional i32 remote_replica_count;
+
+    // TODO(wangdan): would be supported later.
+    10:optional duplication_mode mode;
+
+    // Used for listing duplications with partition-level details (client -> 
meta server).
+    // partition index => partition states.
+    11:optional map<i32, duplication_partition_state> partition_states;
 }
 
 // This request is sent from client to meta.
diff --git a/src/common/duplication_common.cpp 
b/src/common/duplication_common.cpp
index cc49165f9..a0eee52d3 100644
--- a/src/common/duplication_common.cpp
+++ b/src/common/duplication_common.cpp
@@ -20,6 +20,7 @@
 #include <nlohmann/json.hpp>
 #include <cstdint>
 #include <map>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -181,11 +182,12 @@ static nlohmann::json duplication_entry_to_json(const 
duplication_entry &ent)
     };
 
     if (ent.__isset.progress) {
-        nlohmann::json sub_json;
-        for (const auto &p : ent.progress) {
-            sub_json[std::to_string(p.first)] = p.second;
+        nlohmann::json progress;
+        for (const auto &[partition_index, state] : ent.progress) {
+            progress[std::to_string(partition_index)] = state;
         }
-        json["progress"] = sub_json;
+
+        json["progress"] = progress;
     }
 
     if (ent.__isset.remote_app_name) {
@@ -198,6 +200,19 @@ static nlohmann::json duplication_entry_to_json(const 
duplication_entry &ent)
         json["remote_replica_count"] = ent.remote_replica_count;
     }
 
+    if (ent.__isset.partition_states) {
+        nlohmann::json partition_states;
+        for (const auto &[partition_index, state] : ent.partition_states) {
+            nlohmann::json partition_state;
+            partition_state["confirmed_decree"] = state.confirmed_decree;
+            partition_state["last_committed_decree"] = 
state.last_committed_decree;
+
+            partition_states[std::to_string(partition_index)] = 
partition_state;
+        }
+
+        json["partition_states"] = partition_states;
+    }
+
     return json;
 }
 
diff --git a/src/meta/duplication/duplication_info.cpp 
b/src/meta/duplication/duplication_info.cpp
index 8f609c7b2..5aaa21a9c 100644
--- a/src/meta/duplication/duplication_info.cpp
+++ b/src/meta/duplication/duplication_info.cpp
@@ -18,7 +18,6 @@
 #include "duplication_info.h"
 
 #include "common/duplication_common.h"
-#include "meta/meta_data.h"
 #include "runtime/api_layer1.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
@@ -221,7 +220,7 @@ void duplication_info::persist_status()
 
 std::string duplication_info::to_string() const
 {
-    return duplication_entry_to_string(to_duplication_entry());
+    return duplication_entry_to_string(to_partition_level_entry_for_list());
 }
 
 blob duplication_info::to_json_blob() const
@@ -293,17 +292,11 @@ duplication_info_s_ptr 
duplication_info::decode_from_blob(dupid_t dup_id,
     return dup;
 }
 
-void duplication_info::append_if_valid_for_query(
-    const app_state &app,
-    /*out*/ std::vector<duplication_entry> &entry_list) const
+void duplication_info::append_as_entry(std::vector<duplication_entry> 
&entry_list) const
 {
     zauto_read_lock l(_lock);
 
-    entry_list.emplace_back(to_duplication_entry());
-    duplication_entry &ent = entry_list.back();
-    // the confirmed decree is not useful for displaying
-    // the overall state of duplication
-    ent.__isset.progress = false;
+    entry_list.emplace_back(to_duplication_level_entry());
 }
 
 } // namespace dsn::replication
diff --git a/src/meta/duplication/duplication_info.h 
b/src/meta/duplication/duplication_info.h
index 7563d3d41..2b7e9dd03 100644
--- a/src/meta/duplication/duplication_info.h
+++ b/src/meta/duplication/duplication_info.h
@@ -24,6 +24,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -38,10 +39,7 @@
 #include "utils/fmt_utils.h"
 #include "utils/zlocks.h"
 
-namespace dsn {
-namespace replication {
-
-class app_state;
+namespace dsn::replication {
 
 class duplication_info;
 
@@ -149,10 +147,10 @@ public:
 
     // duplication_query_rpc is handled in THREAD_POOL_META_SERVER,
     // which is not thread safe for read.
-    void append_if_valid_for_query(const app_state &app,
-                                   /*out*/ std::vector<duplication_entry> 
&entry_list) const;
+    void append_as_entry(std::vector<duplication_entry> &entry_list) const;
 
-    duplication_entry to_duplication_entry() const
+    // Build an entry including only duplication-level info.
+    duplication_entry to_duplication_level_entry() const
     {
         duplication_entry entry;
         entry.dupid = id;
@@ -162,13 +160,47 @@ public:
         entry.__set_fail_mode(_fail_mode);
         entry.__set_remote_app_name(remote_app_name);
         entry.__set_remote_replica_count(remote_replica_count);
+
+        return entry;
+    }
+
+    // Build an entry including also partition-level progress used for sync 
besides
+    // duplication-level info.
+    duplication_entry to_partition_level_entry_for_sync() const
+    {
+        auto entry = to_duplication_level_entry();
+
         entry.__isset.progress = true;
-        for (const auto &kv : _progress) {
-            if (!kv.second.is_inited) {
+        for (const auto &[partition_index, state] : _progress) {
+            if (!state.is_inited) {
+                continue;
+            }
+
+            entry.progress.emplace(partition_index, state.stored_decree);
+        }
+
+        return entry;
+    }
+
+    // Build an entry including also partition-level detailed states used for 
list
+    // besides duplication-level info.
+    duplication_entry to_partition_level_entry_for_list() const
+    {
+        auto entry = to_duplication_level_entry();
+
+        entry.__isset.partition_states = true;
+        for (const auto &[partition_index, state] : _progress) {
+            if (!state.is_inited) {
                 continue;
             }
-            entry.progress[kv.first] = kv.second.stored_decree;
+
+            duplication_partition_state partition_state;
+            partition_state.confirmed_decree = state.stored_decree;
+            partition_state.last_committed_decree = 
state.last_committed_decree;
+
+            entry.partition_states.emplace(partition_index, partition_state);
         }
+
         return entry;
     }
 
@@ -231,7 +263,7 @@ private:
         bool checkpoint_prepared{false};
     };
 
-    // partition_idx => progress
+    // partition_index => progress
     std::map<int, partition_progress> _progress;
 
     uint64_t _last_progress_report_ms{0};
@@ -281,7 +313,6 @@ extern void json_encode(dsn::json::JsonWriter &out, const 
duplication_fail_mode:
 
 extern bool json_decode(const dsn::json::JsonObject &in, 
duplication_fail_mode::type &s);
 
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
 
 USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::duplication_info);
diff --git a/src/meta/duplication/meta_duplication_service.cpp 
b/src/meta/duplication/meta_duplication_service.cpp
index f63d8af35..28650dc89 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -84,7 +84,7 @@ void meta_duplication_service::query_duplication_info(const 
duplication_query_re
 
         response.appid = app->app_id;
         for (const auto &[_, dup] : app->duplications) {
-            dup->append_if_valid_for_query(*app, response.entry_list);
+            dup->append_as_entry(response.entry_list);
         }
     }
 }
@@ -425,7 +425,7 @@ void 
meta_duplication_service::duplication_sync(duplication_sync_rpc rpc)
                 }
             }
 
-            response.dup_map[app_id][dup_id] = dup->to_duplication_entry();
+            response.dup_map[app_id][dup_id] = 
dup->to_partition_level_entry_for_sync();
 
             // report progress periodically for each duplications
             dup->report_progress_if_time_up();
diff --git a/src/meta/test/duplication_info_test.cpp 
b/src/meta/test/duplication_info_test.cpp
index 72bbcdbb1..a690d450d 100644
--- a/src/meta/test/duplication_info_test.cpp
+++ b/src/meta/test/duplication_info_test.cpp
@@ -29,6 +29,7 @@
 #include <boost/algorithm/string/replace.hpp>
 
 #include "gtest/gtest.h"
+#include "gutil/map_util.h"
 #include "runtime/app_model.h"
 #include "test_util/test_util.h"
 #include "utils/flags.h"
@@ -51,11 +52,46 @@ public:
         dup._status = status;
     }
 
-    static void test_init_progress(duplication_info &dup, int partition_idx, 
decree expected_decree)
+    static void test_duplication_entry_for_sync(const duplication_info &dup,
+                                                int partition_index,
+                                                decree 
expected_confirmed_decree)
     {
-        dup.init_progress(partition_idx, expected_decree);
+        const auto &entry = dup.to_partition_level_entry_for_sync();
+        ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index));
+        ASSERT_EQ(expected_confirmed_decree, gutil::FindOrDie(entry.progress, 
partition_index));
+    }
+
+    static void test_duplication_entry_for_list(const duplication_info &dup,
+                                                int partition_index,
+                                                decree 
expected_confirmed_decree,
+                                                decree 
expected_last_committed_decree)
+    {
+        const auto &entry = dup.to_partition_level_entry_for_list();
+        ASSERT_TRUE(gutil::ContainsKey(entry.partition_states, 
partition_index));
+
+        const auto &state = gutil::FindOrDie(entry.partition_states, 
partition_index);
+        ASSERT_EQ(expected_confirmed_decree, state.confirmed_decree);
+        ASSERT_EQ(expected_last_committed_decree, state.last_committed_decree);
+    }
+
+    static void test_duplication_entry(const duplication_info &dup,
+                                       int partition_index,
+                                       decree expected_confirmed_decree,
+                                       decree expected_last_committed_decree)
+    {
+        test_duplication_entry_for_sync(dup, partition_index, 
expected_confirmed_decree);
+        test_duplication_entry_for_list(
+            dup, partition_index, expected_confirmed_decree, 
expected_last_committed_decree);
+    }
+
+    static void
+    test_init_progress(duplication_info &dup, int partition_index, decree 
expected_decree)
+    {
+        dup.init_progress(partition_index, expected_decree);
+
+        ASSERT_TRUE(gutil::ContainsKey(dup._progress, partition_index));
 
-        const auto &progress = dup._progress[partition_idx];
+        const auto &progress = dup._progress[partition_index];
         ASSERT_EQ(invalid_decree, progress.last_committed_decree);
         ASSERT_EQ(expected_decree, progress.volatile_decree);
         ASSERT_EQ(expected_decree, progress.stored_decree);
@@ -63,6 +99,8 @@ public:
         ASSERT_EQ(0, progress.last_progress_update_ms);
         ASSERT_TRUE(progress.is_inited);
         ASSERT_FALSE(progress.checkpoint_prepared);
+
+        test_duplication_entry(dup, partition_index, expected_decree, 
invalid_decree);
     }
 
     static void test_alter_progress()
@@ -96,6 +134,7 @@ public:
         ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
         ASSERT_TRUE(dup._progress[0].is_altering);
         ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
+        test_duplication_entry(dup, 0, invalid_decree, 8);
 
         // Busy updating.
         entry.__set_last_committed_decree(15);
@@ -109,6 +148,7 @@ public:
         ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
         ASSERT_TRUE(dup._progress[0].is_altering);
         ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
+        test_duplication_entry(dup, 0, invalid_decree, 15);
 
         // Persist progress for partition 0.
         dup.persist_progress(0);
@@ -118,6 +158,7 @@ public:
         ASSERT_EQ(5, dup._progress[0].stored_decree);
         ASSERT_FALSE(dup._progress[0].is_altering);
         ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
+        test_duplication_entry(dup, 0, 5, 15);
 
         // Initialize progress for partition 1.
         test_init_progress(dup, 1, 5);
@@ -130,6 +171,7 @@ public:
         ASSERT_EQ(5, dup._progress[1].stored_decree);
         ASSERT_TRUE(dup._progress[1].is_altering);
         ASSERT_FALSE(dup._progress[1].checkpoint_prepared);
+        test_duplication_entry(dup, 1, 5, 15);
 
         // Persist progress for partition 1.
         dup.persist_progress(1);
@@ -148,6 +190,7 @@ public:
         ASSERT_FALSE(dup._progress[1].is_altering);
         // checkpoint_prepared would be updated successfully even if it is too 
frequent.
         ASSERT_TRUE(dup._progress[1].checkpoint_prepared);
+        test_duplication_entry(dup, 1, 10, 25);
 
         // Reduce last update timestamp to make it infrequent.
         dup._progress[1].last_progress_update_ms -= 
FLAGS_dup_progress_min_update_period_ms + 100;
@@ -160,6 +203,7 @@ public:
         ASSERT_EQ(10, dup._progress[1].stored_decree);
         ASSERT_TRUE(dup._progress[1].is_altering);
         ASSERT_TRUE(dup._progress[1].checkpoint_prepared);
+        test_duplication_entry(dup, 1, 10, 26);
 
         // Checkpoint are ready for both partition 0 and 1.
         ASSERT_TRUE(dup.all_checkpoint_has_prepared());
@@ -181,17 +225,24 @@ public:
         ASSERT_EQ(duplication_status::DS_INIT, dup._status);
         ASSERT_EQ(duplication_status::DS_INIT, dup._next_status);
 
-        auto dup_ent = dup.to_duplication_entry();
-        ASSERT_EQ(0, dup_ent.progress.size());
-        ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name);
-        ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count);
+        {
+            const auto &entry = dup.to_partition_level_entry_for_sync();
+            ASSERT_TRUE(entry.progress.empty());
+            ASSERT_EQ(kTestRemoteAppName, entry.remote_app_name);
+            ASSERT_EQ(kTestRemoteReplicaCount, entry.remote_replica_count);
+        }
 
-        for (int i = 0; i < 4; i++) {
+        for (int i = 0; i < 4; ++i) {
             dup.init_progress(i, invalid_decree);
         }
 
-        for (auto kv : dup_ent.progress) {
-            ASSERT_EQ(invalid_decree, kv.second);
+        {
+            const auto &entry = dup.to_partition_level_entry_for_sync();
+            ASSERT_EQ(4, entry.progress.size());
+            for (int partition_index = 0; partition_index < 4; 
++partition_index) {
+                ASSERT_TRUE(gutil::ContainsKey(entry.progress, 
partition_index));
+                ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress, 
partition_index));
+            }
         }
 
         dup.start();
diff --git a/src/meta/test/meta_duplication_service_test.cpp 
b/src/meta/test/meta_duplication_service_test.cpp
index ec65a41fa..5e0ebe6b1 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -46,6 +46,7 @@
 #include "dsn.layer2_types.h"
 #include "duplication_types.h"
 #include "gtest/gtest.h"
+#include "gutil/map_util.h"
 #include "http/http_server.h"
 #include "http/http_status_code.h"
 #include "meta/duplication/duplication_info.h"
@@ -233,9 +234,12 @@ public:
             ASSERT_EQ(duplication_status::DS_INIT, dup->_status);
             ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status);
 
-            auto ent = dup->to_duplication_entry();
-            for (int j = 0; j < app->partition_count; j++) {
-                ASSERT_EQ(invalid_decree, ent.progress[j]);
+            const auto &entry = dup->to_partition_level_entry_for_sync();
+            ASSERT_EQ(app->partition_count, entry.progress.size());
+            for (int partition_index = 0; partition_index < 
app->partition_count;
+                 ++partition_index) {
+                ASSERT_TRUE(gutil::ContainsKey(entry.progress, 
partition_index));
+                ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress, 
partition_index));
             }
 
             if (last_dup != 0) {
diff --git a/src/replica/duplication/replica_duplicator_manager.cpp 
b/src/replica/duplication/replica_duplicator_manager.cpp
index 724f164d1..5569554d7 100644
--- a/src/replica/duplication/replica_duplicator_manager.cpp
+++ b/src/replica/duplication/replica_duplicator_manager.cpp
@@ -102,9 +102,9 @@ 
replica_duplicator_manager::get_duplication_confirms_to_update() const
 
 void replica_duplicator_manager::sync_duplication(const duplication_entry &ent)
 {
-    // state is inconsistent with meta-server
     auto it = ent.progress.find(get_gpid().get_partition_index());
     if (it == ent.progress.end()) {
+        // Inconsistent with the meta server.
         _duplications.erase(ent.dupid);
         return;
     }
@@ -114,22 +114,24 @@ void replica_duplicator_manager::sync_duplication(const 
duplication_entry &ent)
     dupid_t dupid = ent.dupid;
     duplication_status::type next_status = ent.status;
 
-    replica_duplicator_u_ptr &dup = _duplications[dupid];
-    if (dup == nullptr) {
+    auto &dup = _duplications[dupid];
+    if (!dup) {
         if (!is_duplication_status_invalid(next_status)) {
             dup = std::make_unique<replica_duplicator>(ent, _replica);
         } else {
             LOG_ERROR_PREFIX("illegal duplication status: {}",
                              duplication_status_to_string(next_status));
         }
-    } else {
-        // update progress
-        duplication_progress newp = 
dup->progress().set_confirmed_decree(it->second);
-        CHECK_EQ_PREFIX(dup->update_progress(newp), error_s::ok());
-        dup->update_status_if_needed(next_status);
-        if (ent.__isset.fail_mode) {
-            dup->update_fail_mode(ent.fail_mode);
-        }
+
+        return;
+    }
+
+    // Update progress.
+    
CHECK_EQ_PREFIX(dup->update_progress(dup->progress().set_confirmed_decree(it->second)),
+                    error_s::ok());
+    dup->update_status_if_needed(next_status);
+    if (ent.__isset.fail_mode) {
+        dup->update_fail_mode(ent.fail_mode);
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to