This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 9efe8d6b5 fix(duplication): fix unexpected ERR_NOT_ENOUGH_MEMBER while
checking if the remote table with only one replica is ready for duplication
(#2093)
9efe8d6b5 is described below
commit 9efe8d6b5463c05f1dba5617f656ac645d59ab4f
Author: Dan Wang <[email protected]>
AuthorDate: Thu Aug 15 11:01:15 2024 +0800
fix(duplication): fix unexpected ERR_NOT_ENOUGH_MEMBER while checking if
the remote table with only one replica is ready for duplication (#2093)
https://github.com/apache/incubator-pegasus/issues/2092
While the remote table created with only one replica for the full
duplication
(copying checkpoints), the duplication would get stuck in `DS_APP` status
forever due to `ERR_NOT_ENOUGH_MEMBER`. The reason is that the check
would fail once there is no secondary replica. However, for single-replica
tables,
there is certainly not any secondary replica.
---
src/meta/duplication/duplication_info.cpp | 1 +
src/meta/duplication/meta_duplication_service.cpp | 267 +++++++++++++---------
src/meta/test/meta_duplication_service_test.cpp | 91 +++++++-
src/meta/test/meta_test_base.cpp | 6 +-
src/meta/test/meta_test_base.h | 9 +-
5 files changed, 256 insertions(+), 118 deletions(-)
diff --git a/src/meta/duplication/duplication_info.cpp
b/src/meta/duplication/duplication_info.cpp
index 4036cf293..7f7bb62b6 100644
--- a/src/meta/duplication/duplication_info.cpp
+++ b/src/meta/duplication/duplication_info.cpp
@@ -173,6 +173,7 @@ void duplication_info::persist_status()
_is_altering = false;
_status = _next_status;
+ // Now we don't know what exactly is the next status, thus set DS_INIT
temporarily.
_next_status = duplication_status::DS_INIT;
_fail_mode = _next_fail_mode;
}
diff --git a/src/meta/duplication/meta_duplication_service.cpp
b/src/meta/duplication/meta_duplication_service.cpp
index a2adf2946..76472a04d 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
+#include <stddef.h>
#include <algorithm>
#include <cstdint>
+#include <functional>
#include <queue>
+#include <string_view>
#include <type_traits>
-#include <string_view>
#include "common//duplication_common.h"
#include "common/common.h"
#include "common/gpid.h"
@@ -54,6 +57,7 @@
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "utils/string_conv.h"
+#include "utils/strings.h"
#include "utils/zlocks.h"
DSN_DECLARE_bool(dup_ignore_other_cluster_ids);
@@ -336,10 +340,12 @@ void
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
std::queue<std::string> nodes({get_duplication_path(*app),
std::to_string(dup->id)});
_meta_svc->get_meta_storage()->create_node_recursively(
std::move(nodes), std::move(value), [app, this, dup, rpc, resp_err]()
mutable {
- LOG_INFO("[{}] add duplication successfully [app_name: {},
remote_cluster_name: {}]",
+ LOG_INFO("[{}] add duplication successfully [app_name: {},
remote_cluster_name: {}, "
+ "remote_app_name: {}]",
dup->log_prefix(),
app->app_name,
- dup->remote_cluster_name);
+ dup->remote_cluster_name,
+ dup->remote_app_name);
// The duplication starts only after it's been persisted.
dup->persist_status();
@@ -484,45 +490,103 @@ void
meta_duplication_service::create_follower_app_for_duplication(
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CREATE_APP);
dsn::marshall(msg, request);
- rpc::call(dsn::dns_resolver::instance().resolve_address(meta_servers),
- msg,
- _meta_svc->tracker(),
- [=](error_code err, configuration_create_app_response &&resp)
mutable {
- FAIL_POINT_INJECT_NOT_RETURN_F("update_app_request_ok",
- [&](std::string_view s) ->
void { err = ERR_OK; });
- error_code create_err = err == ERR_OK ? resp.err : err;
- error_code update_err = ERR_NO_NEED_OPERATE;
-
- FAIL_POINT_INJECT_NOT_RETURN_F(
- "persist_dup_status_failed",
- [&](std::string_view s) -> void { create_err = ERR_OK;
});
- if (create_err == ERR_OK) {
- update_err =
dup->alter_status(duplication_status::DS_APP);
- }
-
- FAIL_POINT_INJECT_F("persist_dup_status_failed",
- [&](std::string_view s) -> void {
return; });
- if (update_err == ERR_OK) {
- blob value = dup->to_json_blob();
- // Note: this function is `async`, it may not be
persisted completed
- // after executing, now using `_is_altering` to judge
whether `updating` or
- // `completed`, if `_is_altering`, dup->alter_status()
will return `ERR_BUSY`
-
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
- std::move(value),
- [=]() {
dup->persist_status(); });
- } else {
- LOG_ERROR(
- "created follower app[{}.{}] to trigger duplicate
checkpoint failed: "
+ rpc::call(
+ dsn::dns_resolver::instance().resolve_address(meta_servers),
+ msg,
+ _meta_svc->tracker(),
+ [dup, this](error_code err, configuration_create_app_response &&resp)
mutable {
+ FAIL_POINT_INJECT_NOT_RETURN_F("update_app_request_ok",
+ [&err](std::string_view) -> void {
err = ERR_OK; });
+
+ error_code create_err = err == ERR_OK ? resp.err : err;
+ FAIL_POINT_INJECT_NOT_RETURN_F(
+ "persist_dup_status_failed",
+ [&create_err](std::string_view) -> void { create_err = ERR_OK;
});
+
+ error_code update_err = ERR_NO_NEED_OPERATE;
+ if (create_err == ERR_OK) {
+ update_err = dup->alter_status(duplication_status::DS_APP);
+ }
+
+ FAIL_POINT_INJECT_F("persist_dup_status_failed",
+ [](std::string_view) -> void { return; });
+
+ if (update_err == ERR_OK) {
+ blob value = dup->to_json_blob();
+ // Note: this function is `async`, it may not be persisted
completed
+ // after executing, now using `_is_altering` to judge whether
`updating` or
+ // `completed`, if `_is_altering`, dup->alter_status() will
return `ERR_BUSY`
+
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
+ std::move(value),
+ [dup]() {
dup->persist_status(); });
+ } else {
+ LOG_ERROR("create follower app[{}.{}] to trigger duplicate
checkpoint failed: "
"duplication_status = {}, create_err = {},
update_err = {}",
dup->remote_cluster_name,
- dup->app_name,
+ dup->remote_app_name,
duplication_status_to_string(dup->status()),
create_err,
update_err);
- }
- });
+ }
+ });
+}
+
+namespace {
+
+// The format of `replica_state_str` is
"<has_primary>,<valid_secondaries>,<invalid_secondaries>":
+//
+// <has_primary>: bool, true means if the address of primary replica
is valid,
+// otherwise false.
+// <valid_secondaries>: uint32_t, the number of secondaries whose address
are valid.
+// <invalid_secondaries>: uint32_t, the number of secondaries whose address
are invalid.
+void mock_create_app(std::string_view replica_state_str,
+ const std::shared_ptr<duplication_info> &dup,
+ dsn::query_cfg_response &resp,
+ dsn::error_code &err)
+{
+ std::vector<std::string> strs;
+ utils::split_args(replica_state_str.data(), strs, ',');
+ CHECK_EQ(strs.size(), 3);
+
+ bool has_primary = 0;
+ CHECK_TRUE(buf2bool(strs[0], has_primary));
+
+ uint32_t valid_secondaries = 0;
+ CHECK_TRUE(buf2uint32(strs[1], valid_secondaries));
+
+ uint32_t invalid_secondaries = 0;
+ CHECK_TRUE(buf2uint32(strs[2], invalid_secondaries));
+
+ std::vector<host_port> nodes;
+ if (has_primary) {
+ nodes.emplace_back("localhost", 34801);
+ } else {
+ nodes.emplace_back();
+ }
+ for (uint32_t i = 0; i < valid_secondaries; ++i) {
+ nodes.emplace_back("localhost", static_cast<uint16_t>(34802 + i));
+ }
+ for (uint32_t i = 0; i < invalid_secondaries; ++i) {
+ nodes.emplace_back();
+ }
+
+ for (int32_t i = 0; i < dup->partition_count; ++i) {
+ partition_configuration pc;
+ pc.max_replica_count = dup->remote_replica_count;
+
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
+ for (size_t j = 1; j < nodes.size(); ++j) {
+ ADD_IP_AND_HOST_PORT_BY_DNS(pc, secondaries, nodes[j]);
+ }
+
+ resp.partitions.push_back(std::move(pc));
+ }
+
+ err = ERR_OK;
}
+} // anonymous namespace
+
void meta_duplication_service::check_follower_app_if_create_completed(
const std::shared_ptr<duplication_info> &dup)
{
@@ -535,79 +599,78 @@ void
meta_duplication_service::check_follower_app_if_create_completed(
dsn::message_ex *msg =
dsn::message_ex::create_request(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX);
dsn::marshall(msg, meta_config_request);
- rpc::call(dsn::dns_resolver::instance().resolve_address(meta_servers),
- msg,
- _meta_svc->tracker(),
- [=](error_code err, query_cfg_response &&resp) mutable {
- FAIL_POINT_INJECT_NOT_RETURN_F("create_app_ok",
[&](std::string_view s) -> void {
- err = ERR_OK;
- int count = dup->partition_count;
- while (count-- > 0) {
- const host_port primary("localhost", 34801);
- const host_port secondary1("localhost", 34802);
- const host_port secondary2("localhost", 34803);
-
- partition_configuration pc;
- SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, primary);
- SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries,
secondary1, secondary2);
- resp.partitions.emplace_back(pc);
- }
- });
-
- // - ERR_INCONSISTENT_STATE: partition count of response
isn't equal with local
- // - ERR_INACTIVE_STATE: the follower table hasn't been
healthy
- error_code query_err = err == ERR_OK ? resp.err : err;
- if (query_err == ERR_OK) {
- if (resp.partitions.size() != dup->partition_count) {
- query_err = ERR_INCONSISTENT_STATE;
- } else {
- for (const auto &pc : resp.partitions) {
- if (!pc.hp_primary) {
- query_err = ERR_INACTIVE_STATE;
- break;
- }
-
- if (pc.hp_secondaries.empty()) {
- query_err = ERR_NOT_ENOUGH_MEMBER;
- break;
- }
-
- for (const auto &secondary : pc.hp_secondaries) {
- if (!secondary) {
- query_err = ERR_INACTIVE_STATE;
- break;
- }
- }
- }
- }
- }
-
- error_code update_err = ERR_NO_NEED_OPERATE;
- if (query_err == ERR_OK) {
- update_err =
dup->alter_status(duplication_status::DS_LOG);
- }
-
- FAIL_POINT_INJECT_F("persist_dup_status_failed",
- [&](std::string_view s) -> void {
return; });
- if (update_err == ERR_OK) {
- blob value = dup->to_json_blob();
- // Note: this function is `async`, it may not be
persisted completed
- // after executing, now using `_is_altering` to judge
whether `updating` or
- // `completed`, if `_is_altering`, dup->alter_status()
will return `ERR_BUSY`
-
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
- std::move(value),
- [dup]() {
dup->persist_status(); });
- } else {
- LOG_ERROR(
- "query follower app[{}.{}] replica configuration
completed, result: "
+ rpc::call(
+ dsn::dns_resolver::instance().resolve_address(meta_servers),
+ msg,
+ _meta_svc->tracker(),
+ [dup, this](error_code err, query_cfg_response &&resp) mutable {
+ FAIL_POINT_INJECT_NOT_RETURN_F(
+ "create_app_ok",
+ std::bind(
+ mock_create_app, std::placeholders::_1, dup,
std::ref(resp), std::ref(err)));
+
+ // - ERR_INCONSISTENT_STATE: partition count of response isn't
equal with local
+ // - ERR_INACTIVE_STATE: the follower table hasn't been healthy
+ error_code query_err = err == ERR_OK ? resp.err : err;
+ if (query_err == ERR_OK) {
+ if (resp.partitions.size() != dup->partition_count) {
+ query_err = ERR_INCONSISTENT_STATE;
+ } else {
+ for (const auto &pc : resp.partitions) {
+ if (!pc.hp_primary) {
+ // Fail once the primary replica is unavailable.
+ query_err = ERR_INACTIVE_STATE;
+ break;
+ }
+
+ // Once replica count is more than 1, at least one
secondary replica
+ // is required.
+ if (1 + pc.hp_secondaries.size() <
pc.max_replica_count &&
+ pc.hp_secondaries.empty()) {
+ query_err = ERR_NOT_ENOUGH_MEMBER;
+ break;
+ }
+
+ for (const auto &secondary : pc.hp_secondaries) {
+ if (!secondary) {
+ // Fail once any secondary replica is
unavailable.
+ query_err = ERR_INACTIVE_STATE;
+ break;
+ }
+ }
+ if (query_err != ERR_OK) {
+ break;
+ }
+ }
+ }
+ }
+
+ error_code update_err = ERR_NO_NEED_OPERATE;
+ if (query_err == ERR_OK) {
+ update_err = dup->alter_status(duplication_status::DS_LOG);
+ }
+
+ FAIL_POINT_INJECT_F("persist_dup_status_failed",
+ [](std::string_view) -> void { return; });
+
+ if (update_err == ERR_OK) {
+ blob value = dup->to_json_blob();
+ // Note: this function is `async`, it may not be persisted
completed
+ // after executing, now using `_is_altering` to judge whether
`updating` or
+ // `completed`, if `_is_altering`, dup->alter_status() will
return `ERR_BUSY`
+
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
+ std::move(value),
+ [dup]() {
dup->persist_status(); });
+ } else {
+ LOG_ERROR("query follower app[{}.{}] replica configuration
completed, result: "
"duplication_status = {}, query_err = {}, update_err
= {}",
dup->remote_cluster_name,
dup->remote_app_name,
duplication_status_to_string(dup->status()),
query_err,
update_err);
- }
- });
+ }
+ });
}
void meta_duplication_service::do_update_partition_confirmed(
diff --git a/src/meta/test/meta_duplication_service_test.cpp
b/src/meta/test/meta_duplication_service_test.cpp
index 54a58af87..8fbcddf09 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -119,6 +119,12 @@ public:
return create_dup(app_name, remote_cluster, app_name,
remote_replica_count);
}
+ duplication_add_response create_dup(const std::string &app_name,
+ const int32_t remote_replica_count)
+ {
+ return create_dup(app_name, kTestRemoteClusterName,
remote_replica_count);
+ }
+
duplication_add_response create_dup(const std::string &app_name)
{
return create_dup(app_name, kTestRemoteClusterName,
kTestRemoteReplicaCount);
@@ -963,39 +969,99 @@ TEST_F(meta_duplication_service_test,
check_follower_app_if_create_completed)
{
struct test_case
{
+ int32_t remote_replica_count;
std::vector<std::string> fail_cfg_name;
std::vector<std::string> fail_cfg_action;
bool is_altering;
duplication_status::type cur_status;
duplication_status::type next_status;
- } test_cases[] = {{{"create_app_ok"},
- {"void()"},
+ } test_cases[] = {// 3 remote replicas with both primary and secondaries
valid.
+ {3,
+ {"create_app_ok"},
+ {"void(true,2,0)"},
false,
duplication_status::DS_LOG,
duplication_status::DS_INIT},
- // the case just `palace holder`, actually
- // `check_follower_app_if_create_completed` is failed by
default in unit test
- {{"create_app_failed"},
+ // 3 remote replicas with primary invalid and all
secondaries valid.
+ {3,
+ {"create_app_ok"},
+ {"void(false,2,0)"},
+ false,
+ duplication_status::DS_APP,
+ duplication_status::DS_INIT},
+ // 3 remote replicas with primary valid and only one
secondary present
+ // and valid.
+ {3,
+ {"create_app_ok"},
+ {"void(true,1,0)"},
+ false,
+ duplication_status::DS_LOG,
+ duplication_status::DS_INIT},
+ // 3 remote replicas with primary valid and one
secondary invalid.
+ {3,
+ {"create_app_ok"},
+ {"void(true,1,1)"},
+ false,
+ duplication_status::DS_APP,
+ duplication_status::DS_INIT},
+ // 3 remote replicas with primary valid and only one
secondary present
+ // and invalid.
+ {3,
+ {"create_app_ok"},
+ {"void(true,0,1)"},
+ false,
+ duplication_status::DS_APP,
+ duplication_status::DS_INIT},
+ // 3 remote replicas with primary valid and both
secondaries absent.
+ {3,
+ {"create_app_ok"},
+ {"void(true,0,0)"},
+ false,
+ duplication_status::DS_APP,
+ duplication_status::DS_INIT},
+ // 1 remote replicas with primary valid.
+ {1,
+ {"create_app_ok"},
+ {"void(true,0,0)"},
+ false,
+ duplication_status::DS_LOG,
+ duplication_status::DS_INIT},
+ // 1 remote replicas with primary invalid.
+ {1,
+ {"create_app_ok"},
+ {"void(false,0,0)"},
+ false,
+ duplication_status::DS_APP,
+ duplication_status::DS_INIT},
+ // The case is just a "palace holder", actually
+ // `check_follower_app_if_create_completed` would fail
by default
+ // in unit test.
+ {3,
+ {"create_app_failed"},
{"off()"},
false,
duplication_status::DS_APP,
duplication_status::DS_INIT},
- {{"create_app_ok", "persist_dup_status_failed"},
- {"void()", "return()"},
+ {3,
+ {"create_app_ok", "persist_dup_status_failed"},
+ {"void(true,2,0)", "return()"},
true,
duplication_status::DS_APP,
duplication_status::DS_LOG}};
+ size_t i = 0;
for (const auto &test : test_cases) {
- const auto test_app = fmt::format("{}{}", test.fail_cfg_name[0],
test.fail_cfg_name.size());
- create_app(test_app);
- auto app = find_app(test_app);
+ const auto &app_name =
fmt::format("check_follower_app_if_create_completed_test_{}", i++);
+ create_app(app_name);
- auto dup_add_resp = create_dup(test_app);
+ auto app = find_app(app_name);
+ auto dup_add_resp = create_dup(app_name, test.remote_replica_count);
auto dup = app->duplications[dup_add_resp.dupid];
+
// 'check_follower_app_if_create_completed' must execute under
duplication_status::DS_APP,
- // so force update it
+ // so force update it.
force_update_dup_status(dup, duplication_status::DS_APP);
+
fail::setup();
for (int i = 0; i < test.fail_cfg_name.size(); i++) {
fail::cfg(test.fail_cfg_name[i], test.fail_cfg_action[i]);
@@ -1003,6 +1069,7 @@ TEST_F(meta_duplication_service_test,
check_follower_app_if_create_completed)
check_follower_app_if_create_completed(dup);
wait_all();
fail::teardown();
+
ASSERT_EQ(dup->is_altering(), test.is_altering);
ASSERT_EQ(next_status(dup), test.next_status);
ASSERT_EQ(dup->status(), test.cur_status);
diff --git a/src/meta/test/meta_test_base.cpp b/src/meta/test/meta_test_base.cpp
index 965939d56..49cf729d1 100644
--- a/src/meta/test/meta_test_base.cpp
+++ b/src/meta/test/meta_test_base.cpp
@@ -182,14 +182,16 @@ std::vector<host_port>
meta_test_base::ensure_enough_alive_nodes(int min_node_co
return nodes;
}
-void meta_test_base::create_app(const std::string &name, uint32_t
partition_count)
+void meta_test_base::create_app(const std::string &name,
+ int32_t partition_count,
+ int32_t replica_count)
{
configuration_create_app_request req;
configuration_create_app_response resp;
req.app_name = name;
req.options.app_type = "simple_kv";
req.options.partition_count = partition_count;
- req.options.replica_count = 3;
+ req.options.replica_count = replica_count;
req.options.success_if_exist = false;
req.options.is_stateful = true;
req.options.envs["value_version"] = "1";
diff --git a/src/meta/test/meta_test_base.h b/src/meta/test/meta_test_base.h
index 2f27e883a..25f3eed42 100644
--- a/src/meta/test/meta_test_base.h
+++ b/src/meta/test/meta_test_base.h
@@ -57,8 +57,13 @@ public:
std::vector<host_port> ensure_enough_alive_nodes(int min_node_count);
- // create an app for test with specified name and specified partition count
- void create_app(const std::string &name, uint32_t partition_count);
+ // Create an app for test with specified name, partition count and replica
count.
+ void create_app(const std::string &name, int32_t partition_count, int32_t
replica_count);
+
+ void create_app(const std::string &name, int32_t partition_count)
+ {
+ create_app(name, partition_count, 3);
+ }
void create_app(const std::string &name) { create_app(name, 8); }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]