This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 7c49e41b8 feat(duplication): support duplication entry for multiple
purposes (#2163)
7c49e41b8 is described below
commit 7c49e41b8d3ca9d41701864049cd34f432a70438
Author: Dan Wang <[email protected]>
AuthorDate: Wed Dec 4 12:03:59 2024 +0800
feat(duplication): support duplication entry for multiple purposes (#2163)
`duplication_entry` is an important struct for duplication, including
per-duplication
properties and progress info(confirmed decree) for each partition of a
table. It
contains all useful info for a duplication. Therefore, it has been used in
response to
query and sync:
- The query is requested by client, to the meta server to get the
properties of
duplications of single table;
- The sync is requested by replica server, also to the meta server to sync
progress.
Soon we would support a new interface to get duplications from multiple
tables,
i.e. `list duplications`. It would request duplications of multiple tables,
also including
per-duplication properties and progress info(confirmed decree and a new
field last
committed decree). It still uses `duplication_entry` as the response to the
client.
Thus we add fields to `duplication_entry` to support more purposes.
---
idl/duplication.thrift | 29 ++++++++-
src/common/duplication_common.cpp | 23 +++++--
src/meta/duplication/duplication_info.cpp | 13 +---
src/meta/duplication/duplication_info.h | 57 +++++++++++++----
src/meta/duplication/meta_duplication_service.cpp | 4 +-
src/meta/test/duplication_info_test.cpp | 71 +++++++++++++++++++---
src/meta/test/meta_duplication_service_test.cpp | 10 ++-
.../duplication/replica_duplicator_manager.cpp | 24 ++++----
8 files changed, 177 insertions(+), 54 deletions(-)
diff --git a/idl/duplication.thrift b/idl/duplication.thrift
index 589cc5d08..604edd1d3 100644
--- a/idl/duplication.thrift
+++ b/idl/duplication.thrift
@@ -22,6 +22,15 @@ namespace cpp dsn.replication
namespace go admin
namespace java org.apache.pegasus.replication
+// Indicate which data of a table needs to be duplicated:
+// * FULL: all of the data of the table needs to be duplicated.
+// * INCREMENTAL: only incremental data of the table would be duplicated.
+enum duplication_mode
+{
+ FULL = 0,
+ INCREMENTAL,
+}
+
// - INIT -> PREPARE
// - PREPARE -> APP
// - APP -> LOG
@@ -129,6 +138,16 @@ struct duplication_modify_response
2:i32 appid;
}
+// The states tracking each partition for duplication.
+struct duplication_partition_state
+{
+ // The max decree of this partition that has been confirmed to be received
by follower.
+ 1:i64 confirmed_decree;
+
+ // The max decree that has been committed by this partition.
+ 2:i64 last_committed_decree;
+}
+
struct duplication_entry
{
1:i32 dupid;
@@ -136,7 +155,8 @@ struct duplication_entry
3:string remote;
4:i64 create_ts;
- // partition_index => confirmed decree
+ // Used for syncing duplications with partition-level progress (replica
server -> meta server).
+ // partition index => confirmed decree.
5:optional map<i32, i64> progress;
7:optional duplication_fail_mode fail_mode;
@@ -150,6 +170,13 @@ struct duplication_entry
// For versions >= v2.6.0, this could be specified by client.
// For versions < v2.6.0, this must be the same with source replica_count.
9:optional i32 remote_replica_count;
+
+ // TODO(wangdan): would be supported later.
+ 10:optional duplication_mode mode;
+
+ // Used for listing duplications with partition-level details (client ->
meta server).
+ // partition index => partition states.
+ 11:optional map<i32, duplication_partition_state> partition_states;
}
// This request is sent from client to meta.
diff --git a/src/common/duplication_common.cpp
b/src/common/duplication_common.cpp
index cc49165f9..a0eee52d3 100644
--- a/src/common/duplication_common.cpp
+++ b/src/common/duplication_common.cpp
@@ -20,6 +20,7 @@
#include <nlohmann/json.hpp>
#include <cstdint>
#include <map>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -181,11 +182,12 @@ static nlohmann::json duplication_entry_to_json(const
duplication_entry &ent)
};
if (ent.__isset.progress) {
- nlohmann::json sub_json;
- for (const auto &p : ent.progress) {
- sub_json[std::to_string(p.first)] = p.second;
+ nlohmann::json progress;
+ for (const auto &[partition_index, state] : ent.progress) {
+ progress[std::to_string(partition_index)] = state;
}
- json["progress"] = sub_json;
+
+ json["progress"] = progress;
}
if (ent.__isset.remote_app_name) {
@@ -198,6 +200,19 @@ static nlohmann::json duplication_entry_to_json(const
duplication_entry &ent)
json["remote_replica_count"] = ent.remote_replica_count;
}
+ if (ent.__isset.partition_states) {
+ nlohmann::json partition_states;
+ for (const auto &[partition_index, state] : ent.partition_states) {
+ nlohmann::json partition_state;
+ partition_state["confirmed_decree"] = state.confirmed_decree;
+ partition_state["last_committed_decree"] =
state.last_committed_decree;
+
+ partition_states[std::to_string(partition_index)] =
partition_state;
+ }
+
+ json["partition_states"] = partition_states;
+ }
+
return json;
}
diff --git a/src/meta/duplication/duplication_info.cpp
b/src/meta/duplication/duplication_info.cpp
index 8f609c7b2..5aaa21a9c 100644
--- a/src/meta/duplication/duplication_info.cpp
+++ b/src/meta/duplication/duplication_info.cpp
@@ -18,7 +18,6 @@
#include "duplication_info.h"
#include "common/duplication_common.h"
-#include "meta/meta_data.h"
#include "runtime/api_layer1.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
@@ -221,7 +220,7 @@ void duplication_info::persist_status()
std::string duplication_info::to_string() const
{
- return duplication_entry_to_string(to_duplication_entry());
+ return duplication_entry_to_string(to_partition_level_entry_for_list());
}
blob duplication_info::to_json_blob() const
@@ -293,17 +292,11 @@ duplication_info_s_ptr
duplication_info::decode_from_blob(dupid_t dup_id,
return dup;
}
-void duplication_info::append_if_valid_for_query(
- const app_state &app,
- /*out*/ std::vector<duplication_entry> &entry_list) const
+void duplication_info::append_as_entry(std::vector<duplication_entry>
&entry_list) const
{
zauto_read_lock l(_lock);
- entry_list.emplace_back(to_duplication_entry());
- duplication_entry &ent = entry_list.back();
- // the confirmed decree is not useful for displaying
- // the overall state of duplication
- ent.__isset.progress = false;
+ entry_list.emplace_back(to_duplication_level_entry());
}
} // namespace dsn::replication
diff --git a/src/meta/duplication/duplication_info.h
b/src/meta/duplication/duplication_info.h
index 7563d3d41..2b7e9dd03 100644
--- a/src/meta/duplication/duplication_info.h
+++ b/src/meta/duplication/duplication_info.h
@@ -24,6 +24,7 @@
#include <map>
#include <memory>
#include <string>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -38,10 +39,7 @@
#include "utils/fmt_utils.h"
#include "utils/zlocks.h"
-namespace dsn {
-namespace replication {
-
-class app_state;
+namespace dsn::replication {
class duplication_info;
@@ -149,10 +147,10 @@ public:
// duplication_query_rpc is handled in THREAD_POOL_META_SERVER,
// which is not thread safe for read.
- void append_if_valid_for_query(const app_state &app,
- /*out*/ std::vector<duplication_entry>
&entry_list) const;
+ void append_as_entry(std::vector<duplication_entry> &entry_list) const;
- duplication_entry to_duplication_entry() const
+ // Build an entry including only duplication-level info.
+ duplication_entry to_duplication_level_entry() const
{
duplication_entry entry;
entry.dupid = id;
@@ -162,13 +160,47 @@ public:
entry.__set_fail_mode(_fail_mode);
entry.__set_remote_app_name(remote_app_name);
entry.__set_remote_replica_count(remote_replica_count);
+
+ return entry;
+ }
+
+ // Build an entry including also partition-level progress used for sync
besides
+ // duplication-level info.
+ duplication_entry to_partition_level_entry_for_sync() const
+ {
+ auto entry = to_duplication_level_entry();
+
entry.__isset.progress = true;
- for (const auto &kv : _progress) {
- if (!kv.second.is_inited) {
+ for (const auto &[partition_index, state] : _progress) {
+ if (!state.is_inited) {
+ continue;
+ }
+
+ entry.progress.emplace(partition_index, state.stored_decree);
+ }
+
+ return entry;
+ }
+
+ // Build an entry including also partition-level detailed states used for
list
+ // besides duplication-level info.
+ duplication_entry to_partition_level_entry_for_list() const
+ {
+ auto entry = to_duplication_level_entry();
+
+ entry.__isset.partition_states = true;
+ for (const auto &[partition_index, state] : _progress) {
+ if (!state.is_inited) {
continue;
}
- entry.progress[kv.first] = kv.second.stored_decree;
+
+ duplication_partition_state partition_state;
+ partition_state.confirmed_decree = state.stored_decree;
+ partition_state.last_committed_decree =
state.last_committed_decree;
+
+ entry.partition_states.emplace(partition_index, partition_state);
}
+
return entry;
}
@@ -231,7 +263,7 @@ private:
bool checkpoint_prepared{false};
};
- // partition_idx => progress
+ // partition_index => progress
std::map<int, partition_progress> _progress;
uint64_t _last_progress_report_ms{0};
@@ -281,7 +313,6 @@ extern void json_encode(dsn::json::JsonWriter &out, const
duplication_fail_mode:
extern bool json_decode(const dsn::json::JsonObject &in,
duplication_fail_mode::type &s);
-} // namespace replication
-} // namespace dsn
+} // namespace dsn::replication
USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::duplication_info);
diff --git a/src/meta/duplication/meta_duplication_service.cpp
b/src/meta/duplication/meta_duplication_service.cpp
index f63d8af35..28650dc89 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -84,7 +84,7 @@ void meta_duplication_service::query_duplication_info(const
duplication_query_re
response.appid = app->app_id;
for (const auto &[_, dup] : app->duplications) {
- dup->append_if_valid_for_query(*app, response.entry_list);
+ dup->append_as_entry(response.entry_list);
}
}
}
@@ -425,7 +425,7 @@ void
meta_duplication_service::duplication_sync(duplication_sync_rpc rpc)
}
}
- response.dup_map[app_id][dup_id] = dup->to_duplication_entry();
+ response.dup_map[app_id][dup_id] =
dup->to_partition_level_entry_for_sync();
// report progress periodically for each duplications
dup->report_progress_if_time_up();
diff --git a/src/meta/test/duplication_info_test.cpp
b/src/meta/test/duplication_info_test.cpp
index 72bbcdbb1..a690d450d 100644
--- a/src/meta/test/duplication_info_test.cpp
+++ b/src/meta/test/duplication_info_test.cpp
@@ -29,6 +29,7 @@
#include <boost/algorithm/string/replace.hpp>
#include "gtest/gtest.h"
+#include "gutil/map_util.h"
#include "runtime/app_model.h"
#include "test_util/test_util.h"
#include "utils/flags.h"
@@ -51,11 +52,46 @@ public:
dup._status = status;
}
- static void test_init_progress(duplication_info &dup, int partition_idx,
decree expected_decree)
+ static void test_duplication_entry_for_sync(const duplication_info &dup,
+ int partition_index,
+ decree
expected_confirmed_decree)
{
- dup.init_progress(partition_idx, expected_decree);
+ const auto &entry = dup.to_partition_level_entry_for_sync();
+ ASSERT_TRUE(gutil::ContainsKey(entry.progress, partition_index));
+ ASSERT_EQ(expected_confirmed_decree, gutil::FindOrDie(entry.progress,
partition_index));
+ }
+
+ static void test_duplication_entry_for_list(const duplication_info &dup,
+ int partition_index,
+ decree
expected_confirmed_decree,
+ decree
expected_last_committed_decree)
+ {
+ const auto &entry = dup.to_partition_level_entry_for_list();
+ ASSERT_TRUE(gutil::ContainsKey(entry.partition_states,
partition_index));
+
+ const auto &state = gutil::FindOrDie(entry.partition_states,
partition_index);
+ ASSERT_EQ(expected_confirmed_decree, state.confirmed_decree);
+ ASSERT_EQ(expected_last_committed_decree, state.last_committed_decree);
+ }
+
+ static void test_duplication_entry(const duplication_info &dup,
+ int partition_index,
+ decree expected_confirmed_decree,
+ decree expected_last_committed_decree)
+ {
+ test_duplication_entry_for_sync(dup, partition_index,
expected_confirmed_decree);
+ test_duplication_entry_for_list(
+ dup, partition_index, expected_confirmed_decree,
expected_last_committed_decree);
+ }
+
+ static void
+ test_init_progress(duplication_info &dup, int partition_index, decree
expected_decree)
+ {
+ dup.init_progress(partition_index, expected_decree);
+
+ ASSERT_TRUE(gutil::ContainsKey(dup._progress, partition_index));
- const auto &progress = dup._progress[partition_idx];
+ const auto &progress = dup._progress[partition_index];
ASSERT_EQ(invalid_decree, progress.last_committed_decree);
ASSERT_EQ(expected_decree, progress.volatile_decree);
ASSERT_EQ(expected_decree, progress.stored_decree);
@@ -63,6 +99,8 @@ public:
ASSERT_EQ(0, progress.last_progress_update_ms);
ASSERT_TRUE(progress.is_inited);
ASSERT_FALSE(progress.checkpoint_prepared);
+
+ test_duplication_entry(dup, partition_index, expected_decree,
invalid_decree);
}
static void test_alter_progress()
@@ -96,6 +134,7 @@ public:
ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
ASSERT_TRUE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
+ test_duplication_entry(dup, 0, invalid_decree, 8);
// Busy updating.
entry.__set_last_committed_decree(15);
@@ -109,6 +148,7 @@ public:
ASSERT_EQ(invalid_decree, dup._progress[0].stored_decree);
ASSERT_TRUE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
+ test_duplication_entry(dup, 0, invalid_decree, 15);
// Persist progress for partition 0.
dup.persist_progress(0);
@@ -118,6 +158,7 @@ public:
ASSERT_EQ(5, dup._progress[0].stored_decree);
ASSERT_FALSE(dup._progress[0].is_altering);
ASSERT_TRUE(dup._progress[0].checkpoint_prepared);
+ test_duplication_entry(dup, 0, 5, 15);
// Initialize progress for partition 1.
test_init_progress(dup, 1, 5);
@@ -130,6 +171,7 @@ public:
ASSERT_EQ(5, dup._progress[1].stored_decree);
ASSERT_TRUE(dup._progress[1].is_altering);
ASSERT_FALSE(dup._progress[1].checkpoint_prepared);
+ test_duplication_entry(dup, 1, 5, 15);
// Persist progress for partition 1.
dup.persist_progress(1);
@@ -148,6 +190,7 @@ public:
ASSERT_FALSE(dup._progress[1].is_altering);
// checkpoint_prepared would be updated successfully even if it is too
frequent.
ASSERT_TRUE(dup._progress[1].checkpoint_prepared);
+ test_duplication_entry(dup, 1, 10, 25);
// Reduce last update timestamp to make it infrequent.
dup._progress[1].last_progress_update_ms -=
FLAGS_dup_progress_min_update_period_ms + 100;
@@ -160,6 +203,7 @@ public:
ASSERT_EQ(10, dup._progress[1].stored_decree);
ASSERT_TRUE(dup._progress[1].is_altering);
ASSERT_TRUE(dup._progress[1].checkpoint_prepared);
+ test_duplication_entry(dup, 1, 10, 26);
// Checkpoint are ready for both partition 0 and 1.
ASSERT_TRUE(dup.all_checkpoint_has_prepared());
@@ -181,17 +225,24 @@ public:
ASSERT_EQ(duplication_status::DS_INIT, dup._status);
ASSERT_EQ(duplication_status::DS_INIT, dup._next_status);
- auto dup_ent = dup.to_duplication_entry();
- ASSERT_EQ(0, dup_ent.progress.size());
- ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name);
- ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count);
+ {
+ const auto &entry = dup.to_partition_level_entry_for_sync();
+ ASSERT_TRUE(entry.progress.empty());
+ ASSERT_EQ(kTestRemoteAppName, entry.remote_app_name);
+ ASSERT_EQ(kTestRemoteReplicaCount, entry.remote_replica_count);
+ }
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < 4; ++i) {
dup.init_progress(i, invalid_decree);
}
- for (auto kv : dup_ent.progress) {
- ASSERT_EQ(invalid_decree, kv.second);
+ {
+ const auto &entry = dup.to_partition_level_entry_for_sync();
+ ASSERT_EQ(4, entry.progress.size());
+ for (int partition_index = 0; partition_index < 4;
++partition_index) {
+ ASSERT_TRUE(gutil::ContainsKey(entry.progress,
partition_index));
+ ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress,
partition_index));
+ }
}
dup.start();
diff --git a/src/meta/test/meta_duplication_service_test.cpp
b/src/meta/test/meta_duplication_service_test.cpp
index ec65a41fa..5e0ebe6b1 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -46,6 +46,7 @@
#include "dsn.layer2_types.h"
#include "duplication_types.h"
#include "gtest/gtest.h"
+#include "gutil/map_util.h"
#include "http/http_server.h"
#include "http/http_status_code.h"
#include "meta/duplication/duplication_info.h"
@@ -233,9 +234,12 @@ public:
ASSERT_EQ(duplication_status::DS_INIT, dup->_status);
ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status);
- auto ent = dup->to_duplication_entry();
- for (int j = 0; j < app->partition_count; j++) {
- ASSERT_EQ(invalid_decree, ent.progress[j]);
+ const auto &entry = dup->to_partition_level_entry_for_sync();
+ ASSERT_EQ(app->partition_count, entry.progress.size());
+ for (int partition_index = 0; partition_index <
app->partition_count;
+ ++partition_index) {
+ ASSERT_TRUE(gutil::ContainsKey(entry.progress,
partition_index));
+ ASSERT_EQ(invalid_decree, gutil::FindOrDie(entry.progress,
partition_index));
}
if (last_dup != 0) {
diff --git a/src/replica/duplication/replica_duplicator_manager.cpp
b/src/replica/duplication/replica_duplicator_manager.cpp
index 724f164d1..5569554d7 100644
--- a/src/replica/duplication/replica_duplicator_manager.cpp
+++ b/src/replica/duplication/replica_duplicator_manager.cpp
@@ -102,9 +102,9 @@
replica_duplicator_manager::get_duplication_confirms_to_update() const
void replica_duplicator_manager::sync_duplication(const duplication_entry &ent)
{
- // state is inconsistent with meta-server
auto it = ent.progress.find(get_gpid().get_partition_index());
if (it == ent.progress.end()) {
+ // Inconsistent with the meta server.
_duplications.erase(ent.dupid);
return;
}
@@ -114,22 +114,24 @@ void replica_duplicator_manager::sync_duplication(const
duplication_entry &ent)
dupid_t dupid = ent.dupid;
duplication_status::type next_status = ent.status;
- replica_duplicator_u_ptr &dup = _duplications[dupid];
- if (dup == nullptr) {
+ auto &dup = _duplications[dupid];
+ if (!dup) {
if (!is_duplication_status_invalid(next_status)) {
dup = std::make_unique<replica_duplicator>(ent, _replica);
} else {
LOG_ERROR_PREFIX("illegal duplication status: {}",
duplication_status_to_string(next_status));
}
- } else {
- // update progress
- duplication_progress newp =
dup->progress().set_confirmed_decree(it->second);
- CHECK_EQ_PREFIX(dup->update_progress(newp), error_s::ok());
- dup->update_status_if_needed(next_status);
- if (ent.__isset.fail_mode) {
- dup->update_fail_mode(ent.fail_mode);
- }
+
+ return;
+ }
+
+ // Update progress.
+
CHECK_EQ_PREFIX(dup->update_progress(dup->progress().set_confirmed_decree(it->second)),
+ error_s::ok());
+ dup->update_status_if_needed(next_status);
+ if (ent.__isset.fail_mode) {
+ dup->update_fail_mode(ent.fail_mode);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]