This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 9efe8d6b5 fix(duplication): fix unexpected ERR_NOT_ENOUGH_MEMBER while 
checking if the remote table with only one replica is ready for duplication 
(#2093)
9efe8d6b5 is described below

commit 9efe8d6b5463c05f1dba5617f656ac645d59ab4f
Author: Dan Wang <[email protected]>
AuthorDate: Thu Aug 15 11:01:15 2024 +0800

    fix(duplication): fix unexpected ERR_NOT_ENOUGH_MEMBER while checking if 
the remote table with only one replica is ready for duplication (#2093)
    
    https://github.com/apache/incubator-pegasus/issues/2092
    
    While the remote table created with only one replica for the full 
duplication
    (copying checkpoints), the duplication would get stuck in `DS_APP` status
    forever due to `ERR_NOT_ENOUGH_MEMBER`. The reason is that the check
    would fail once there is no secondary replica. However, for single-replica 
tables,
    there is certainly not any secondary replica.
---
 src/meta/duplication/duplication_info.cpp         |   1 +
 src/meta/duplication/meta_duplication_service.cpp | 267 +++++++++++++---------
 src/meta/test/meta_duplication_service_test.cpp   |  91 +++++++-
 src/meta/test/meta_test_base.cpp                  |   6 +-
 src/meta/test/meta_test_base.h                    |   9 +-
 5 files changed, 256 insertions(+), 118 deletions(-)

diff --git a/src/meta/duplication/duplication_info.cpp 
b/src/meta/duplication/duplication_info.cpp
index 4036cf293..7f7bb62b6 100644
--- a/src/meta/duplication/duplication_info.cpp
+++ b/src/meta/duplication/duplication_info.cpp
@@ -173,6 +173,7 @@ void duplication_info::persist_status()
 
     _is_altering = false;
     _status = _next_status;
+    // Now we don't know what exactly is the next status, thus set DS_INIT 
temporarily.
     _next_status = duplication_status::DS_INIT;
     _fail_mode = _next_fail_mode;
 }
diff --git a/src/meta/duplication/meta_duplication_service.cpp 
b/src/meta/duplication/meta_duplication_service.cpp
index a2adf2946..76472a04d 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -15,13 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// IWYU pragma: no_include <ext/alloc_traits.h>
 #include <fmt/core.h>
+#include <stddef.h>
 #include <algorithm>
 #include <cstdint>
+#include <functional>
 #include <queue>
+#include <string_view>
 #include <type_traits>
 
-#include <string_view>
 #include "common//duplication_common.h"
 #include "common/common.h"
 #include "common/gpid.h"
@@ -54,6 +57,7 @@
 #include "utils/fmt_logging.h"
 #include "utils/ports.h"
 #include "utils/string_conv.h"
+#include "utils/strings.h"
 #include "utils/zlocks.h"
 
 DSN_DECLARE_bool(dup_ignore_other_cluster_ids);
@@ -336,10 +340,12 @@ void 
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
     std::queue<std::string> nodes({get_duplication_path(*app), 
std::to_string(dup->id)});
     _meta_svc->get_meta_storage()->create_node_recursively(
         std::move(nodes), std::move(value), [app, this, dup, rpc, resp_err]() 
mutable {
-            LOG_INFO("[{}] add duplication successfully [app_name: {}, 
remote_cluster_name: {}]",
+            LOG_INFO("[{}] add duplication successfully [app_name: {}, 
remote_cluster_name: {}, "
+                     "remote_app_name: {}]",
                      dup->log_prefix(),
                      app->app_name,
-                     dup->remote_cluster_name);
+                     dup->remote_cluster_name,
+                     dup->remote_app_name);
 
             // The duplication starts only after it's been persisted.
             dup->persist_status();
@@ -484,45 +490,103 @@ void 
meta_duplication_service::create_follower_app_for_duplication(
 
     dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CREATE_APP);
     dsn::marshall(msg, request);
-    rpc::call(dsn::dns_resolver::instance().resolve_address(meta_servers),
-              msg,
-              _meta_svc->tracker(),
-              [=](error_code err, configuration_create_app_response &&resp) 
mutable {
-                  FAIL_POINT_INJECT_NOT_RETURN_F("update_app_request_ok",
-                                                 [&](std::string_view s) -> 
void { err = ERR_OK; });
-                  error_code create_err = err == ERR_OK ? resp.err : err;
-                  error_code update_err = ERR_NO_NEED_OPERATE;
-
-                  FAIL_POINT_INJECT_NOT_RETURN_F(
-                      "persist_dup_status_failed",
-                      [&](std::string_view s) -> void { create_err = ERR_OK; 
});
-                  if (create_err == ERR_OK) {
-                      update_err = 
dup->alter_status(duplication_status::DS_APP);
-                  }
-
-                  FAIL_POINT_INJECT_F("persist_dup_status_failed",
-                                      [&](std::string_view s) -> void { 
return; });
-                  if (update_err == ERR_OK) {
-                      blob value = dup->to_json_blob();
-                      // Note: this function is `async`, it may not be 
persisted completed
-                      // after executing, now using `_is_altering` to judge 
whether `updating` or
-                      // `completed`, if `_is_altering`, dup->alter_status() 
will return `ERR_BUSY`
-                      
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
-                                                              std::move(value),
-                                                              [=]() { 
dup->persist_status(); });
-                  } else {
-                      LOG_ERROR(
-                          "created follower app[{}.{}] to trigger duplicate 
checkpoint failed: "
+    rpc::call(
+        dsn::dns_resolver::instance().resolve_address(meta_servers),
+        msg,
+        _meta_svc->tracker(),
+        [dup, this](error_code err, configuration_create_app_response &&resp) 
mutable {
+            FAIL_POINT_INJECT_NOT_RETURN_F("update_app_request_ok",
+                                           [&err](std::string_view) -> void { 
err = ERR_OK; });
+
+            error_code create_err = err == ERR_OK ? resp.err : err;
+            FAIL_POINT_INJECT_NOT_RETURN_F(
+                "persist_dup_status_failed",
+                [&create_err](std::string_view) -> void { create_err = ERR_OK; 
});
+
+            error_code update_err = ERR_NO_NEED_OPERATE;
+            if (create_err == ERR_OK) {
+                update_err = dup->alter_status(duplication_status::DS_APP);
+            }
+
+            FAIL_POINT_INJECT_F("persist_dup_status_failed",
+                                [](std::string_view) -> void { return; });
+
+            if (update_err == ERR_OK) {
+                blob value = dup->to_json_blob();
+                // Note: this function is `async`, it may not be persisted 
completed
+                // after executing, now using `_is_altering` to judge whether 
`updating` or
+                // `completed`, if `_is_altering`, dup->alter_status() will 
return `ERR_BUSY`
+                
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
+                                                        std::move(value),
+                                                        [dup]() { 
dup->persist_status(); });
+            } else {
+                LOG_ERROR("create follower app[{}.{}] to trigger duplicate 
checkpoint failed: "
                           "duplication_status = {}, create_err = {}, 
update_err = {}",
                           dup->remote_cluster_name,
-                          dup->app_name,
+                          dup->remote_app_name,
                           duplication_status_to_string(dup->status()),
                           create_err,
                           update_err);
-                  }
-              });
+            }
+        });
+}
+
+namespace {
+
+// The format of `replica_state_str` is 
"<has_primary>,<valid_secondaries>,<invalid_secondaries>":
+//
+//         <has_primary>:   bool, true means if the address of primary replica 
is valid,
+//                          otherwise false.
+//   <valid_secondaries>:   uint32_t, the number of secondaries whose address 
are valid.
+// <invalid_secondaries>:   uint32_t, the number of secondaries whose address 
are invalid.
+void mock_create_app(std::string_view replica_state_str,
+                     const std::shared_ptr<duplication_info> &dup,
+                     dsn::query_cfg_response &resp,
+                     dsn::error_code &err)
+{
+    std::vector<std::string> strs;
+    utils::split_args(replica_state_str.data(), strs, ',');
+    CHECK_EQ(strs.size(), 3);
+
+    bool has_primary = 0;
+    CHECK_TRUE(buf2bool(strs[0], has_primary));
+
+    uint32_t valid_secondaries = 0;
+    CHECK_TRUE(buf2uint32(strs[1], valid_secondaries));
+
+    uint32_t invalid_secondaries = 0;
+    CHECK_TRUE(buf2uint32(strs[2], invalid_secondaries));
+
+    std::vector<host_port> nodes;
+    if (has_primary) {
+        nodes.emplace_back("localhost", 34801);
+    } else {
+        nodes.emplace_back();
+    }
+    for (uint32_t i = 0; i < valid_secondaries; ++i) {
+        nodes.emplace_back("localhost", static_cast<uint16_t>(34802 + i));
+    }
+    for (uint32_t i = 0; i < invalid_secondaries; ++i) {
+        nodes.emplace_back();
+    }
+
+    for (int32_t i = 0; i < dup->partition_count; ++i) {
+        partition_configuration pc;
+        pc.max_replica_count = dup->remote_replica_count;
+
+        SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
+        for (size_t j = 1; j < nodes.size(); ++j) {
+            ADD_IP_AND_HOST_PORT_BY_DNS(pc, secondaries, nodes[j]);
+        }
+
+        resp.partitions.push_back(std::move(pc));
+    }
+
+    err = ERR_OK;
 }
 
+} // anonymous namespace
+
 void meta_duplication_service::check_follower_app_if_create_completed(
     const std::shared_ptr<duplication_info> &dup)
 {
@@ -535,79 +599,78 @@ void 
meta_duplication_service::check_follower_app_if_create_completed(
 
     dsn::message_ex *msg = 
dsn::message_ex::create_request(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX);
     dsn::marshall(msg, meta_config_request);
-    rpc::call(dsn::dns_resolver::instance().resolve_address(meta_servers),
-              msg,
-              _meta_svc->tracker(),
-              [=](error_code err, query_cfg_response &&resp) mutable {
-                  FAIL_POINT_INJECT_NOT_RETURN_F("create_app_ok", 
[&](std::string_view s) -> void {
-                      err = ERR_OK;
-                      int count = dup->partition_count;
-                      while (count-- > 0) {
-                          const host_port primary("localhost", 34801);
-                          const host_port secondary1("localhost", 34802);
-                          const host_port secondary2("localhost", 34803);
-
-                          partition_configuration pc;
-                          SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, primary);
-                          SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, 
secondary1, secondary2);
-                          resp.partitions.emplace_back(pc);
-                      }
-                  });
-
-                  // - ERR_INCONSISTENT_STATE: partition count of response 
isn't equal with local
-                  // - ERR_INACTIVE_STATE: the follower table hasn't been 
healthy
-                  error_code query_err = err == ERR_OK ? resp.err : err;
-                  if (query_err == ERR_OK) {
-                      if (resp.partitions.size() != dup->partition_count) {
-                          query_err = ERR_INCONSISTENT_STATE;
-                      } else {
-                          for (const auto &pc : resp.partitions) {
-                              if (!pc.hp_primary) {
-                                  query_err = ERR_INACTIVE_STATE;
-                                  break;
-                              }
-
-                              if (pc.hp_secondaries.empty()) {
-                                  query_err = ERR_NOT_ENOUGH_MEMBER;
-                                  break;
-                              }
-
-                              for (const auto &secondary : pc.hp_secondaries) {
-                                  if (!secondary) {
-                                      query_err = ERR_INACTIVE_STATE;
-                                      break;
-                                  }
-                              }
-                          }
-                      }
-                  }
-
-                  error_code update_err = ERR_NO_NEED_OPERATE;
-                  if (query_err == ERR_OK) {
-                      update_err = 
dup->alter_status(duplication_status::DS_LOG);
-                  }
-
-                  FAIL_POINT_INJECT_F("persist_dup_status_failed",
-                                      [&](std::string_view s) -> void { 
return; });
-                  if (update_err == ERR_OK) {
-                      blob value = dup->to_json_blob();
-                      // Note: this function is `async`, it may not be 
persisted completed
-                      // after executing, now using `_is_altering` to judge 
whether `updating` or
-                      // `completed`, if `_is_altering`, dup->alter_status() 
will return `ERR_BUSY`
-                      
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
-                                                              std::move(value),
-                                                              [dup]() { 
dup->persist_status(); });
-                  } else {
-                      LOG_ERROR(
-                          "query follower app[{}.{}] replica configuration 
completed, result: "
+    rpc::call(
+        dsn::dns_resolver::instance().resolve_address(meta_servers),
+        msg,
+        _meta_svc->tracker(),
+        [dup, this](error_code err, query_cfg_response &&resp) mutable {
+            FAIL_POINT_INJECT_NOT_RETURN_F(
+                "create_app_ok",
+                std::bind(
+                    mock_create_app, std::placeholders::_1, dup, 
std::ref(resp), std::ref(err)));
+
+            // - ERR_INCONSISTENT_STATE: partition count of response isn't 
equal with local
+            // - ERR_INACTIVE_STATE: the follower table hasn't been healthy
+            error_code query_err = err == ERR_OK ? resp.err : err;
+            if (query_err == ERR_OK) {
+                if (resp.partitions.size() != dup->partition_count) {
+                    query_err = ERR_INCONSISTENT_STATE;
+                } else {
+                    for (const auto &pc : resp.partitions) {
+                        if (!pc.hp_primary) {
+                            // Fail once the primary replica is unavailable.
+                            query_err = ERR_INACTIVE_STATE;
+                            break;
+                        }
+
+                        // Once replica count is more than 1, at least one 
secondary replica
+                        // is required.
+                        if (1 + pc.hp_secondaries.size() < 
pc.max_replica_count &&
+                            pc.hp_secondaries.empty()) {
+                            query_err = ERR_NOT_ENOUGH_MEMBER;
+                            break;
+                        }
+
+                        for (const auto &secondary : pc.hp_secondaries) {
+                            if (!secondary) {
+                                // Fail once any secondary replica is 
unavailable.
+                                query_err = ERR_INACTIVE_STATE;
+                                break;
+                            }
+                        }
+                        if (query_err != ERR_OK) {
+                            break;
+                        }
+                    }
+                }
+            }
+
+            error_code update_err = ERR_NO_NEED_OPERATE;
+            if (query_err == ERR_OK) {
+                update_err = dup->alter_status(duplication_status::DS_LOG);
+            }
+
+            FAIL_POINT_INJECT_F("persist_dup_status_failed",
+                                [](std::string_view) -> void { return; });
+
+            if (update_err == ERR_OK) {
+                blob value = dup->to_json_blob();
+                // Note: this function is `async`, it may not be persisted 
completed
+                // after executing, now using `_is_altering` to judge whether 
`updating` or
+                // `completed`, if `_is_altering`, dup->alter_status() will 
return `ERR_BUSY`
+                
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
+                                                        std::move(value),
+                                                        [dup]() { 
dup->persist_status(); });
+            } else {
+                LOG_ERROR("query follower app[{}.{}] replica configuration 
completed, result: "
                           "duplication_status = {}, query_err = {}, update_err 
= {}",
                           dup->remote_cluster_name,
                           dup->remote_app_name,
                           duplication_status_to_string(dup->status()),
                           query_err,
                           update_err);
-                  }
-              });
+            }
+        });
 }
 
 void meta_duplication_service::do_update_partition_confirmed(
diff --git a/src/meta/test/meta_duplication_service_test.cpp 
b/src/meta/test/meta_duplication_service_test.cpp
index 54a58af87..8fbcddf09 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -119,6 +119,12 @@ public:
         return create_dup(app_name, remote_cluster, app_name, 
remote_replica_count);
     }
 
+    duplication_add_response create_dup(const std::string &app_name,
+                                        const int32_t remote_replica_count)
+    {
+        return create_dup(app_name, kTestRemoteClusterName, 
remote_replica_count);
+    }
+
     duplication_add_response create_dup(const std::string &app_name)
     {
         return create_dup(app_name, kTestRemoteClusterName, 
kTestRemoteReplicaCount);
@@ -963,39 +969,99 @@ TEST_F(meta_duplication_service_test, 
check_follower_app_if_create_completed)
 {
     struct test_case
     {
+        int32_t remote_replica_count;
         std::vector<std::string> fail_cfg_name;
         std::vector<std::string> fail_cfg_action;
         bool is_altering;
         duplication_status::type cur_status;
         duplication_status::type next_status;
-    } test_cases[] = {{{"create_app_ok"},
-                       {"void()"},
+    } test_cases[] = {// 3 remote replicas with both primary and secondaries 
valid.
+                      {3,
+                       {"create_app_ok"},
+                       {"void(true,2,0)"},
                        false,
                        duplication_status::DS_LOG,
                        duplication_status::DS_INIT},
-                      // the case just `palace holder`, actually
-                      // `check_follower_app_if_create_completed` is failed by 
default in unit test
-                      {{"create_app_failed"},
+                      // 3 remote replicas with primary invalid and all 
secondaries valid.
+                      {3,
+                       {"create_app_ok"},
+                       {"void(false,2,0)"},
+                       false,
+                       duplication_status::DS_APP,
+                       duplication_status::DS_INIT},
+                      // 3 remote replicas with primary valid and only one 
secondary present
+                      // and valid.
+                      {3,
+                       {"create_app_ok"},
+                       {"void(true,1,0)"},
+                       false,
+                       duplication_status::DS_LOG,
+                       duplication_status::DS_INIT},
+                      // 3 remote replicas with primary valid and one 
secondary invalid.
+                      {3,
+                       {"create_app_ok"},
+                       {"void(true,1,1)"},
+                       false,
+                       duplication_status::DS_APP,
+                       duplication_status::DS_INIT},
+                      // 3 remote replicas with primary valid and only one 
secondary present
+                      // and invalid.
+                      {3,
+                       {"create_app_ok"},
+                       {"void(true,0,1)"},
+                       false,
+                       duplication_status::DS_APP,
+                       duplication_status::DS_INIT},
+                      // 3 remote replicas with primary valid and both 
secondaries absent.
+                      {3,
+                       {"create_app_ok"},
+                       {"void(true,0,0)"},
+                       false,
+                       duplication_status::DS_APP,
+                       duplication_status::DS_INIT},
+                      // 1 remote replicas with primary valid.
+                      {1,
+                       {"create_app_ok"},
+                       {"void(true,0,0)"},
+                       false,
+                       duplication_status::DS_LOG,
+                       duplication_status::DS_INIT},
+                      // 1 remote replicas with primary invalid.
+                      {1,
+                       {"create_app_ok"},
+                       {"void(false,0,0)"},
+                       false,
+                       duplication_status::DS_APP,
+                       duplication_status::DS_INIT},
+                      // The case is just a "palace holder", actually
+                      // `check_follower_app_if_create_completed` would fail 
by default
+                      // in unit test.
+                      {3,
+                       {"create_app_failed"},
                        {"off()"},
                        false,
                        duplication_status::DS_APP,
                        duplication_status::DS_INIT},
-                      {{"create_app_ok", "persist_dup_status_failed"},
-                       {"void()", "return()"},
+                      {3,
+                       {"create_app_ok", "persist_dup_status_failed"},
+                       {"void(true,2,0)", "return()"},
                        true,
                        duplication_status::DS_APP,
                        duplication_status::DS_LOG}};
 
+    size_t i = 0;
     for (const auto &test : test_cases) {
-        const auto test_app = fmt::format("{}{}", test.fail_cfg_name[0], 
test.fail_cfg_name.size());
-        create_app(test_app);
-        auto app = find_app(test_app);
+        const auto &app_name = 
fmt::format("check_follower_app_if_create_completed_test_{}", i++);
+        create_app(app_name);
 
-        auto dup_add_resp = create_dup(test_app);
+        auto app = find_app(app_name);
+        auto dup_add_resp = create_dup(app_name, test.remote_replica_count);
         auto dup = app->duplications[dup_add_resp.dupid];
+
         // 'check_follower_app_if_create_completed' must execute under 
duplication_status::DS_APP,
-        // so force update it
+        // so force update it.
         force_update_dup_status(dup, duplication_status::DS_APP);
+
         fail::setup();
         for (int i = 0; i < test.fail_cfg_name.size(); i++) {
             fail::cfg(test.fail_cfg_name[i], test.fail_cfg_action[i]);
@@ -1003,6 +1069,7 @@ TEST_F(meta_duplication_service_test, 
check_follower_app_if_create_completed)
         check_follower_app_if_create_completed(dup);
         wait_all();
         fail::teardown();
+
         ASSERT_EQ(dup->is_altering(), test.is_altering);
         ASSERT_EQ(next_status(dup), test.next_status);
         ASSERT_EQ(dup->status(), test.cur_status);
diff --git a/src/meta/test/meta_test_base.cpp b/src/meta/test/meta_test_base.cpp
index 965939d56..49cf729d1 100644
--- a/src/meta/test/meta_test_base.cpp
+++ b/src/meta/test/meta_test_base.cpp
@@ -182,14 +182,16 @@ std::vector<host_port> 
meta_test_base::ensure_enough_alive_nodes(int min_node_co
     return nodes;
 }
 
-void meta_test_base::create_app(const std::string &name, uint32_t 
partition_count)
+void meta_test_base::create_app(const std::string &name,
+                                int32_t partition_count,
+                                int32_t replica_count)
 {
     configuration_create_app_request req;
     configuration_create_app_response resp;
     req.app_name = name;
     req.options.app_type = "simple_kv";
     req.options.partition_count = partition_count;
-    req.options.replica_count = 3;
+    req.options.replica_count = replica_count;
     req.options.success_if_exist = false;
     req.options.is_stateful = true;
     req.options.envs["value_version"] = "1";
diff --git a/src/meta/test/meta_test_base.h b/src/meta/test/meta_test_base.h
index 2f27e883a..25f3eed42 100644
--- a/src/meta/test/meta_test_base.h
+++ b/src/meta/test/meta_test_base.h
@@ -57,8 +57,13 @@ public:
 
     std::vector<host_port> ensure_enough_alive_nodes(int min_node_count);
 
-    // create an app for test with specified name and specified partition count
-    void create_app(const std::string &name, uint32_t partition_count);
+    // Create an app for test with specified name, partition count and replica 
count.
+    void create_app(const std::string &name, int32_t partition_count, int32_t 
replica_count);
+
+    void create_app(const std::string &name, int32_t partition_count)
+    {
+        create_app(name, partition_count, 3);
+    }
 
     void create_app(const std::string &name) { create_app(name, 8); }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to