This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 671186cc9 feat(duplication): allow remote app name to be specified for
duplication (#1960)
671186cc9 is described below
commit 671186cc92dbb2023597c7e1f89c5e2768229fd0
Author: Dan Wang <[email protected]>
AuthorDate: Thu Mar 28 15:39:29 2024 +0800
feat(duplication): allow remote app name to be specified for duplication
(#1960)
Previously a different remote app name could not be specified while adding
a new duplication. However, there are some scenarios that the app should
have a different name from the source cluster. For example, when an app is
migrated from a small cluster to a large cluster, it needs to be renamed
since
many apps on the target cluster where there might be some app with same
name.
Thus it is necessary to specify a custom name for the app in remote cluster.
This PR is to support this feature. To support this feature, an optional
remote
app name is added for some structures of duplication RPCs such as adding a
new duplication and syncing duplications. Remote app name is also added for
meta data on consistent storage(ZK) for duplication. Compatibility is also
considered once remote app name is missing in RPCs and meta data. Pegasus
shell provides an optional parameter for specifying custom remote name that
is allowed to be different from source cluster while adding a new
duplication.
---
idl/duplication.thrift | 19 ++
src/client/replication_ddl_client.cpp | 30 +-
src/client/replication_ddl_client.h | 14 +-
src/common/duplication_common.cpp | 9 +
src/common/duplication_common.h | 1 +
src/meta/duplication/duplication_info.cpp | 17 +-
src/meta/duplication/duplication_info.h | 42 +--
src/meta/duplication/meta_duplication_service.cpp | 233 ++++++++++-----
src/meta/duplication/meta_duplication_service.h | 8 +-
src/meta/test/duplication_info_test.cpp | 83 ++++--
src/meta/test/meta_duplication_service_test.cpp | 321 ++++++++++++---------
src/replica/duplication/duplication_pipeline.cpp | 3 +-
src/replica/duplication/duplication_sync_timer.cpp | 1 +
src/replica/duplication/duplication_sync_timer.h | 2 +
src/replica/duplication/replica_duplicator.cpp | 4 +
src/replica/duplication/replica_duplicator.h | 3 +
.../duplication/replica_duplicator_manager.cpp | 1 +
.../duplication/replica_duplicator_manager.h | 2 +
src/replica/duplication/replica_follower.cpp | 26 +-
.../test/dup_replica_http_service_test.cpp | 3 +-
.../test/duplication_sync_timer_test.cpp | 42 +--
.../duplication/test/replica_duplicator_test.cpp | 34 ++-
.../duplication/test/replica_follower_test.cpp | 74 +++--
src/replica/replica_http_service.cpp | 1 +
src/runtime/rpc/rpc_holder.h | 4 +-
src/shell/commands/duplication.cpp | 115 +++++---
src/shell/main.cpp | 5 +-
src/utils/errors.h | 9 +
src/utils/test/time_utils_test.cpp | 30 +-
29 files changed, 730 insertions(+), 406 deletions(-)
diff --git a/idl/duplication.thrift b/idl/duplication.thrift
index 5bf256696..682273b1f 100644
--- a/idl/duplication.thrift
+++ b/idl/duplication.thrift
@@ -66,6 +66,10 @@ struct duplication_add_request
// - if false, duplication start state=DS_LOG,
// server will replay and send plog mutation to follower cluster derectly
3:optional bool is_duplicating_checkpoint = true;
+
+ // Since v2.6.0.
+ // Specify the app name of remote cluster.
+ 4:optional string remote_app_name;
}
struct duplication_add_response
@@ -77,6 +81,16 @@ struct duplication_add_response
2:i32 appid;
3:i32 dupid;
4:optional string hint;
+
+ // Since v2.6.0.
+ //
+ // If new duplication is created, this would be its remote_app_name;
+ // Otherwise, once the duplication has existed, this would be the
+ // remote_app_name with which the duplication has been created.
+ //
+ // This field could also be used to check if the meta server supports
+ // remote_app_name(i.e. the version of meta server must be >= v2.6.0).
+ 5:optional string remote_app_name;
}
// This request is sent from client to meta.
@@ -110,6 +124,11 @@ struct duplication_entry
5:optional map<i32, i64> progress;
7:optional duplication_fail_mode fail_mode;
+
+ // Since v2.6.0.
+ // For versions >= v2.6.0, this could be specified by client.
+ // For versions < v2.6.0, this must be the same with source app_name.
+ 8:optional string remote_app_name;
}
// This request is sent from client to meta.
diff --git a/src/client/replication_ddl_client.cpp
b/src/client/replication_ddl_client.cpp
index 67ce4c109..70ad75bc7 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -1361,44 +1361,54 @@ dsn::error_code
replication_ddl_client::query_restore(int32_t restore_app_id, bo
return ERR_OK;
}
-error_with<duplication_add_response> replication_ddl_client::add_dup(
- std::string app_name, std::string remote_cluster_name, bool
is_duplicating_checkpoint)
+error_with<duplication_add_response>
+replication_ddl_client::add_dup(const std::string &app_name,
+ const std::string &remote_cluster_name,
+ bool is_duplicating_checkpoint,
+ const std::string &remote_app_name)
{
+ RETURN_EW_NOT_OK_MSG(validate_app_name(remote_app_name, false),
+ duplication_add_response,
+ "invalid remote_app_name: '{}'",
+ remote_app_name);
+
auto req = std::make_unique<duplication_add_request>();
- req->app_name = std::move(app_name);
- req->remote_cluster_name = std::move(remote_cluster_name);
+ req->app_name = app_name;
+ req->remote_cluster_name = remote_cluster_name;
req->is_duplicating_checkpoint = is_duplicating_checkpoint;
+ req->__set_remote_app_name(remote_app_name);
return call_rpc_sync(duplication_add_rpc(std::move(req),
RPC_CM_ADD_DUPLICATION));
}
error_with<duplication_modify_response>
replication_ddl_client::change_dup_status(
- std::string app_name, int dupid, duplication_status::type status)
+ const std::string &app_name, int dupid, duplication_status::type status)
{
auto req = std::make_unique<duplication_modify_request>();
- req->app_name = std::move(app_name);
+ req->app_name = app_name;
req->dupid = dupid;
req->__set_status(status);
return call_rpc_sync(duplication_modify_rpc(std::move(req),
RPC_CM_MODIFY_DUPLICATION));
}
error_with<duplication_modify_response>
replication_ddl_client::update_dup_fail_mode(
- std::string app_name, int dupid, duplication_fail_mode::type fmode)
+ const std::string &app_name, int dupid, duplication_fail_mode::type fmode)
{
if (_duplication_fail_mode_VALUES_TO_NAMES.find(fmode) ==
_duplication_fail_mode_VALUES_TO_NAMES.end()) {
return FMT_ERR(ERR_INVALID_PARAMETERS, "unexpected
duplication_fail_mode {}", fmode);
}
auto req = std::make_unique<duplication_modify_request>();
- req->app_name = std::move(app_name);
+ req->app_name = app_name;
req->dupid = dupid;
req->__set_fail_mode(fmode);
return call_rpc_sync(duplication_modify_rpc(std::move(req),
RPC_CM_MODIFY_DUPLICATION));
}
-error_with<duplication_query_response>
replication_ddl_client::query_dup(std::string app_name)
+error_with<duplication_query_response>
+replication_ddl_client::query_dup(const std::string &app_name)
{
auto req = std::make_unique<duplication_query_request>();
- req->app_name = std::move(app_name);
+ req->app_name = app_name;
return call_rpc_sync(duplication_query_rpc(std::move(req),
RPC_CM_QUERY_DUPLICATION));
}
diff --git a/src/client/replication_ddl_client.h
b/src/client/replication_ddl_client.h
index b596ae17b..53f7538bd 100644
--- a/src/client/replication_ddl_client.h
+++ b/src/client/replication_ddl_client.h
@@ -141,15 +141,17 @@ public:
bool skip_lost_partitions,
const std::string &outfile);
- error_with<duplication_add_response>
- add_dup(std::string app_name, std::string remote_address, bool
is_duplicating_checkpoint);
+ error_with<duplication_add_response> add_dup(const std::string &app_name,
+ const std::string
&remote_address,
+ bool
is_duplicating_checkpoint,
+ const std::string
&remote_app_name);
error_with<duplication_modify_response>
- change_dup_status(std::string app_name, int dupid,
duplication_status::type status);
+ change_dup_status(const std::string &app_name, int dupid,
duplication_status::type status);
error_with<duplication_modify_response>
- update_dup_fail_mode(std::string app_name, int dupid,
duplication_fail_mode::type fmode);
+ update_dup_fail_mode(const std::string &app_name, int dupid,
duplication_fail_mode::type fmode);
- error_with<duplication_query_response> query_dup(std::string app_name);
+ error_with<duplication_query_response> query_dup(const std::string
&app_name);
dsn::error_code do_restore(const std::string &backup_provider_name,
const std::string &cluster_name,
@@ -270,8 +272,6 @@ public:
static error_s validate_app_name(const std::string &app_name, bool
allow_empty_name = false);
private:
- bool static valid_app_char(int c);
-
void end_meta_request(const rpc_response_task_ptr &callback,
uint32_t attempt_count,
const error_code &err,
diff --git a/src/common/duplication_common.cpp
b/src/common/duplication_common.cpp
index e46a7556b..906b5eca5 100644
--- a/src/common/duplication_common.cpp
+++ b/src/common/duplication_common.cpp
@@ -46,6 +46,8 @@ const std::string
duplication_constants::kDuplicationEnvMasterClusterKey /*NOLIN
"duplication.master_cluster";
const std::string duplication_constants::kDuplicationEnvMasterMetasKey
/*NOLINT*/ =
"duplication.master_metas";
+const std::string duplication_constants::kDuplicationEnvMasterAppNameKey
/*NOLINT*/ =
+ "duplication.master_app_name";
/*extern*/ const char *duplication_status_to_string(duplication_status::type
status)
{
@@ -143,6 +145,7 @@ static nlohmann::json duplication_entry_to_json(const
duplication_entry &ent)
{"status", duplication_status_to_string(ent.status)},
{"fail_mode", duplication_fail_mode_to_string(ent.fail_mode)},
};
+
if (ent.__isset.progress) {
nlohmann::json sub_json;
for (const auto &p : ent.progress) {
@@ -150,6 +153,12 @@ static nlohmann::json duplication_entry_to_json(const
duplication_entry &ent)
}
json["progress"] = sub_json;
}
+
+ if (ent.__isset.remote_app_name) {
+ // remote_app_name is supported since v2.6.0, thus it won't be shown
before v2.6.0.
+ json["remote_app_name"] = ent.remote_app_name;
+ }
+
return json;
}
diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h
index fcc43d3b2..0918d978b 100644
--- a/src/common/duplication_common.h
+++ b/src/common/duplication_common.h
@@ -86,6 +86,7 @@ struct duplication_constants
// These will fill into app env and mark one app as a "follower app" and
record master info
const static std::string kDuplicationEnvMasterClusterKey;
const static std::string kDuplicationEnvMasterMetasKey;
+ const static std::string kDuplicationEnvMasterAppNameKey;
};
USER_DEFINED_ENUM_FORMATTER(duplication_fail_mode::type)
diff --git a/src/meta/duplication/duplication_info.cpp
b/src/meta/duplication/duplication_info.cpp
index 173526915..3f080959f 100644
--- a/src/meta/duplication/duplication_info.cpp
+++ b/src/meta/duplication/duplication_info.cpp
@@ -186,9 +186,10 @@ blob duplication_info::to_json_blob() const
{
json_helper copy;
copy.create_timestamp_ms = create_timestamp_ms;
- copy.remote = follower_cluster_name;
+ copy.remote = remote_cluster_name;
copy.status = _next_status;
copy.fail_mode = _next_fail_mode;
+ copy.remote_app_name = remote_app_name;
return json::json_forwarder<json_helper>::encode(copy);
}
@@ -205,13 +206,20 @@ duplication_info_s_ptr
duplication_info::decode_from_blob(dupid_t dup_id,
int32_t app_id,
const std::string
&app_name,
int32_t
partition_count,
- std::string
store_path,
+ const std::string
&store_path,
const blob &json)
{
json_helper info;
if (!json::json_forwarder<json_helper>::decode(json, info)) {
return nullptr;
}
+
+ if (info.remote_app_name.empty()) {
+ // remote_app_name is missing, which means meta data in remote
storage(zk) is
+ // still of old version(< v2.6.0).
+ info.remote_app_name = app_name;
+ }
+
std::vector<host_port> meta_list;
if (!dsn::replication::replica_helper::load_meta_servers(
meta_list, duplication_constants::kClustersSectionName.c_str(),
info.remote.c_str())) {
@@ -223,9 +231,10 @@ duplication_info_s_ptr
duplication_info::decode_from_blob(dupid_t dup_id,
app_name,
partition_count,
info.create_timestamp_ms,
- std::move(info.remote),
+ info.remote,
+ info.remote_app_name,
std::move(meta_list),
- std::move(store_path));
+ store_path);
dup->_status = info.status;
dup->_fail_mode = info.fail_mode;
return dup;
diff --git a/src/meta/duplication/duplication_info.h
b/src/meta/duplication/duplication_info.h
index 029ca1f2f..e010eb4a7 100644
--- a/src/meta/duplication/duplication_info.h
+++ b/src/meta/duplication/duplication_info.h
@@ -52,20 +52,22 @@ class duplication_info
public:
/// \see meta_duplication_service::new_dup_from_init
/// \see duplication_info::decode_from_blob
- duplication_info(dupid_t dupid,
- int32_t appid,
- std::string app_name,
+ duplication_info(dupid_t dup_id,
+ int32_t app_id,
+ const std::string &app_name,
int32_t partition_count,
uint64_t create_now_ms,
- std::string follower_cluster_name,
- std::vector<host_port> &&follower_cluster_metas,
- std::string meta_store_path)
- : id(dupid),
- app_id(appid),
- app_name(std::move(app_name)),
+ const std::string &remote_cluster_name,
+ const std::string &remote_app_name,
+ std::vector<host_port> &&remote_cluster_metas,
+ const std::string &meta_store_path)
+ : id(dup_id),
+ app_id(app_id),
+ app_name(app_name),
partition_count(partition_count),
- follower_cluster_name(std::move(follower_cluster_name)),
- follower_cluster_metas(std::move(follower_cluster_metas)),
+ remote_cluster_name(remote_cluster_name),
+ remote_app_name(remote_app_name),
+ remote_cluster_metas(std::move(remote_cluster_metas)),
store_path(std::move(meta_store_path)),
create_timestamp_ms(create_now_ms),
prefix_for_log(fmt::format("a{}d{}", app_id, id))
@@ -82,7 +84,7 @@ public:
}
LOG_WARNING("you now create duplication[{}[{}.{}]] without duplicating
checkpoint",
id,
- follower_cluster_name,
+ remote_cluster_name,
app_name);
return alter_status(duplication_status::DS_LOG);
}
@@ -137,7 +139,7 @@ public:
int32_t app_id,
const std::string &app_name,
int32_t partition_count,
- std::string store_path,
+ const std::string
&store_path,
const blob &json);
// duplication_query_rpc is handled in THREAD_POOL_META_SERVER,
@@ -150,9 +152,10 @@ public:
duplication_entry entry;
entry.dupid = id;
entry.create_ts = create_timestamp_ms;
- entry.remote = follower_cluster_name;
+ entry.remote = remote_cluster_name;
entry.status = _status;
entry.__set_fail_mode(_fail_mode);
+ entry.__set_remote_app_name(remote_app_name);
entry.__isset.progress = true;
for (const auto &kv : _progress) {
if (!kv.second.is_inited) {
@@ -236,8 +239,12 @@ private:
duplication_status::type status;
int64_t create_timestamp_ms;
duplication_fail_mode::type fail_mode;
+ std::string remote_app_name;
- DEFINE_JSON_SERIALIZATION(remote, status, create_timestamp_ms,
fail_mode);
+ // Since there is no remote_cluster_name for old versions(< v2.6.0),
remote_app_name is
+ // optional. Following deserialization functions could be compatible
with the situations
+ // where remote_app_name is missing.
+ DEFINE_JSON_SERIALIZATION(remote, status, create_timestamp_ms,
fail_mode, remote_app_name);
};
public:
@@ -246,8 +253,9 @@ public:
const std::string app_name;
const int32_t partition_count{0};
- const std::string follower_cluster_name;
- const std::vector<host_port> follower_cluster_metas;
+ const std::string remote_cluster_name;
+ const std::string remote_app_name;
+ const std::vector<host_port> remote_cluster_metas;
const std::string store_path; // store path on meta service =
get_duplication_path(app, dupid)
const uint64_t create_timestamp_ms{0}; // the time when this dup is
created.
const std::string prefix_for_log;
diff --git a/src/meta/duplication/meta_duplication_service.cpp
b/src/meta/duplication/meta_duplication_service.cpp
index 62f03d734..8feaa52db 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -21,10 +21,12 @@
#include <queue>
#include <type_traits>
+#include "absl/strings/string_view.h"
#include "common//duplication_common.h"
#include "common/common.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
+#include "common/replication_enums.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
#include "duplication_types.h"
@@ -41,6 +43,7 @@
#include "runtime/rpc/rpc_message.h"
#include "runtime/rpc/serialization.h"
#include "runtime/task/async_calls.h"
+#include "utils/api_utilities.h"
#include "utils/blob.h"
#include "utils/chrono_literals.h"
#include "utils/error_code.h"
@@ -49,7 +52,6 @@
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "utils/string_conv.h"
-#include "absl/strings/string_view.h"
#include "utils/zlocks.h"
namespace dsn {
@@ -69,12 +71,12 @@ void meta_duplication_service::query_duplication_info(const
duplication_query_re
std::shared_ptr<app_state> app = _state->get_app(request.app_name);
if (!app || app->status != app_status::AS_AVAILABLE) {
response.err = ERR_APP_NOT_EXIST;
- } else {
- response.appid = app->app_id;
- for (auto &dup_id_to_info : app->duplications) {
- const duplication_info_s_ptr &dup = dup_id_to_info.second;
- dup->append_if_valid_for_query(*app, response.entry_list);
- }
+ return;
+ }
+
+ response.appid = app->app_id;
+ for (const auto & [ _, dup ] : app->duplications) {
+ dup->append_if_valid_for_query(*app, response.entry_list);
}
}
}
@@ -150,6 +152,30 @@ void
meta_duplication_service::do_modify_duplication(std::shared_ptr<app_state>
});
}
+#define LOG_DUP_HINT_AND_RETURN(resp, level, ...)
\
+ do {
\
+ const std::string _msg(fmt::format(__VA_ARGS__));
\
+ (resp).__set_hint(_msg);
\
+ LOG(level, _msg);
\
+ return;
\
+ } while (0)
+
+#define LOG_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, level, ...)
\
+ do {
\
+ if (dsn_likely(expr)) {
\
+ break;
\
+ }
\
+
\
+ (resp).err = (ec);
\
+ LOG_DUP_HINT_AND_RETURN(resp, level, __VA_ARGS__);
\
+ } while (0)
+
+#define LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, ...)
\
+ LOG_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, LOG_LEVEL_WARNING,
__VA_ARGS__)
+
+#define LOG_ERROR_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, ...)
\
+ LOG_DUP_HINT_AND_RETURN_IF_NOT(expr, resp, ec, LOG_LEVEL_ERROR,
__VA_ARGS__)
+
// This call will not recreate if the duplication
// with the same app name and remote end point already exists.
// ThreadPool(WRITE): THREAD_POOL_META_STATE
@@ -158,76 +184,136 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
const auto &request = rpc.request();
auto &response = rpc.response();
- LOG_INFO("add duplication for app({}), remote cluster name is {}",
+ std::string remote_app_name;
+ if (request.__isset.remote_app_name) {
+ remote_app_name = request.remote_app_name;
+ } else {
+ // Once remote_app_name is not specified by client, use source
app_name as
+ // remote_app_name to be compatible with old versions(< v2.6.0) of
client.
+ remote_app_name = request.app_name;
+ }
+
+ LOG_INFO("add duplication for app({}), remote cluster name is {}, "
+ "remote app name is {}",
request.app_name,
- request.remote_cluster_name);
+ request.remote_cluster_name,
+ remote_app_name);
- response.err = ERR_OK;
+ LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(request.remote_cluster_name !=
+ get_current_cluster_name(),
+ response,
+ ERR_INVALID_PARAMETERS,
+ "illegal operation: adding
duplication to itself");
- if (request.remote_cluster_name == get_current_cluster_name()) {
- response.err = ERR_INVALID_PARAMETERS;
- response.__set_hint("illegal operation: adding duplication to itself");
- return;
- }
auto remote_cluster_id =
get_duplication_cluster_id(request.remote_cluster_name);
- if (!remote_cluster_id.is_ok()) {
- response.err = ERR_INVALID_PARAMETERS;
- response.__set_hint(fmt::format("get_duplication_cluster_id({})
failed, error: {}",
- request.remote_cluster_name,
- remote_cluster_id.get_error()));
- return;
- }
+ LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(remote_cluster_id.is_ok(),
+ response,
+ ERR_INVALID_PARAMETERS,
+ "get_duplication_cluster_id({})
failed, error: {}",
+ request.remote_cluster_name,
+ remote_cluster_id.get_error());
std::vector<host_port> meta_list;
- if (!dsn::replication::replica_helper::load_meta_servers(
- meta_list,
- duplication_constants::kClustersSectionName.c_str(),
- request.remote_cluster_name.c_str())) {
- response.err = ERR_INVALID_PARAMETERS;
- response.__set_hint(fmt::format("failed to find cluster[{}] address in
config [{}]",
- request.remote_cluster_name,
-
duplication_constants::kClustersSectionName));
- return;
- }
-
- auto app = _state->get_app(request.app_name);
- if (!app || app->status != app_status::AS_AVAILABLE) {
- response.err = ERR_APP_NOT_EXIST;
- return;
- }
+
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(dsn::replication::replica_helper::load_meta_servers(
+ meta_list,
+
duplication_constants::kClustersSectionName.c_str(),
+
request.remote_cluster_name.c_str()),
+ response,
+ ERR_INVALID_PARAMETERS,
+ "failed to find cluster[{}] address
in config [{}]",
+ request.remote_cluster_name,
+
duplication_constants::kClustersSectionName);
+
+ std::shared_ptr<app_state> app;
duplication_info_s_ptr dup;
- for (const auto &ent : app->duplications) {
- auto it = ent.second;
- if (it->follower_cluster_name == request.remote_cluster_name) {
- dup = ent.second;
- break;
+ {
+ zauto_read_lock l(app_lock());
+
+ app = _state->get_app(request.app_name);
+ // The reason why using !!app rather than just app is that passing
std::shared_ptr into
+ // dsn_likely(i.e. __builtin_expect) would lead to compilation error
"cannot convert
+ // 'std::shared_ptr<dsn::replication::app_state>' to 'long int'".
+ LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
+ !!app, response, ERR_APP_NOT_EXIST, "app {} was not found",
request.app_name);
+ LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(app->status ==
app_status::AS_AVAILABLE,
+ response,
+ ERR_APP_NOT_EXIST,
+ "app status was not
AS_AVAILABLE: name={}, "
+ "status={}",
+ request.app_name,
+ enum_to_string(app->status));
+
+ for (const auto & [ _, dup_info ] : app->duplications) {
+ if (dup_info->remote_cluster_name == request.remote_cluster_name) {
+ dup = dup_info;
+ break;
+ }
+ }
+
+ if (dup) {
+ // The duplication for the same app to the same remote cluster has
existed.
+ remote_app_name = dup->remote_app_name;
+ LOG_INFO("no need to add duplication, since it has existed:
app_name={}, "
+ "remote_cluster_name={}, remote_app_name={}",
+ request.app_name,
+ request.remote_cluster_name,
+ remote_app_name);
+ } else {
+ // Check if other apps of this cluster are duplicated to the same
remote app.
+ for (const auto & [ app_name, cur_app_state ] :
_state->_exist_apps) {
+ if (app_name == request.app_name) {
+ // Skip this app since we want to check other apps.
+ continue;
+ }
+
+ for (const auto & [ _, dup_info ] :
cur_app_state->duplications) {
+ LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
+ dup_info->remote_cluster_name !=
request.remote_cluster_name ||
+ dup_info->remote_app_name != remote_app_name,
+ response,
+ ERR_INVALID_PARAMETERS,
+ "illegal operation: another app({}) is also "
+ "duplicated to the same remote app("
+ "cluster={}, app={})",
+ app_name,
+ request.remote_cluster_name,
+ remote_app_name);
+ }
+ }
}
}
+
if (!dup) {
- dup = new_dup_from_init(request.remote_cluster_name,
std::move(meta_list), app);
+ dup = new_dup_from_init(
+ request.remote_cluster_name, remote_app_name,
std::move(meta_list), app);
}
- do_add_duplication(app, dup, rpc);
+
+ do_add_duplication(app, dup, rpc, remote_app_name);
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state>
&app,
duplication_info_s_ptr &dup,
- duplication_add_rpc &rpc)
+ duplication_add_rpc &rpc,
+ const std::string
&remote_app_name)
{
- const auto err = dup->start(rpc.request().is_duplicating_checkpoint);
- if (dsn_unlikely(err != ERR_OK)) {
- LOG_ERROR("start dup[{}({})] failed: err = {}", app->app_name,
dup->id, err);
- return;
- }
- blob value = dup->to_json_blob();
-
+ const auto &ec = dup->start(rpc.request().is_duplicating_checkpoint);
+ LOG_ERROR_DUP_HINT_AND_RETURN_IF_NOT(ec == ERR_OK,
+ rpc.response(),
+ ec,
+ "start dup[{}({})] failed: err = {}",
+ app->app_name,
+ dup->id,
+ ec);
+
+ auto value = dup->to_json_blob();
std::queue<std::string> nodes({get_duplication_path(*app),
std::to_string(dup->id)});
_meta_svc->get_meta_storage()->create_node_recursively(
- std::move(nodes), std::move(value), [app, this, dup, rpc]() mutable {
+ std::move(nodes), std::move(value), [app, this, dup, rpc,
remote_app_name]() mutable {
LOG_INFO("[{}] add duplication successfully [app_name: {},
follower: {}]",
dup->log_prefix(),
app->app_name,
- dup->follower_cluster_name);
+ dup->remote_cluster_name);
// The duplication starts only after it's been persisted.
dup->persist_status();
@@ -236,6 +322,7 @@ void
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
resp.err = ERR_OK;
resp.appid = app->app_id;
resp.dupid = dup->id;
+ resp.__set_remote_app_name(remote_app_name);
zauto_write_lock l(app_lock());
refresh_duplicating_no_lock(app);
@@ -343,7 +430,7 @@ void
meta_duplication_service::create_follower_app_for_duplication(
const std::shared_ptr<duplication_info> &dup, const
std::shared_ptr<app_state> &app)
{
configuration_create_app_request request;
- request.app_name = app->app_name;
+ request.app_name = dup->remote_app_name;
request.options.app_type = app->app_type;
request.options.partition_count = app->partition_count;
request.options.replica_count = app->max_replica_count;
@@ -359,10 +446,12 @@ void
meta_duplication_service::create_follower_app_for_duplication(
get_current_cluster_name());
request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
_meta_svc->get_meta_list_string());
+
request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterAppNameKey,
+ app->app_name);
host_port meta_servers;
- meta_servers.assign_group(dup->follower_cluster_name.c_str());
- meta_servers.group_host_port()->add_list(dup->follower_cluster_metas);
+ meta_servers.assign_group(dup->remote_cluster_name.c_str());
+ meta_servers.group_host_port()->add_list(dup->remote_cluster_metas);
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CREATE_APP);
dsn::marshall(msg, request);
@@ -396,7 +485,7 @@ void
meta_duplication_service::create_follower_app_for_duplication(
} else {
LOG_ERROR("created follower app[{}.{}] to trigger duplicate
checkpoint failed: "
"duplication_status = {}, create_err = {},
update_err = {}",
- dup->follower_cluster_name,
+ dup->remote_cluster_name,
dup->app_name,
duplication_status_to_string(dup->status()),
create_err,
@@ -409,8 +498,8 @@ void
meta_duplication_service::check_follower_app_if_create_completed(
const std::shared_ptr<duplication_info> &dup)
{
host_port meta_servers;
- meta_servers.assign_group(dup->follower_cluster_name.c_str());
- meta_servers.group_host_port()->add_list(dup->follower_cluster_metas);
+ meta_servers.assign_group(dup->remote_cluster_name.c_str());
+ meta_servers.group_host_port()->add_list(dup->remote_cluster_metas);
query_cfg_request meta_config_request;
meta_config_request.app_name = dup->app_name;
@@ -484,7 +573,7 @@ void
meta_duplication_service::check_follower_app_if_create_completed(
LOG_ERROR(
"query follower app[{}.{}] replica configuration
completed, result: "
"duplication_status = {}, query_err = {}, update_err
= {}",
- dup->follower_cluster_name,
+ dup->remote_cluster_name,
dup->app_name,
duplication_status_to_string(dup->status()),
query_err,
@@ -527,20 +616,21 @@ void
meta_duplication_service::do_update_partition_confirmed(
}
std::shared_ptr<duplication_info>
-meta_duplication_service::new_dup_from_init(const std::string
&follower_cluster_name,
- std::vector<host_port>
&&follower_cluster_metas,
+meta_duplication_service::new_dup_from_init(const std::string
&remote_cluster_name,
+ const std::string &remote_app_name,
+ std::vector<host_port>
&&remote_cluster_metas,
std::shared_ptr<app_state> &app)
const
{
duplication_info_s_ptr dup;
- // use current time to identify this duplication.
- auto dupid = static_cast<dupid_t>(dsn_now_ms() / 1000);
+ // Use current time to identify this duplication.
+ auto dupid = static_cast<dupid_t>(dsn_now_s());
{
zauto_write_lock l(app_lock());
- // hold write lock here to ensure that dupid is unique
- while (app->duplications.find(dupid) != app->duplications.end())
- dupid++;
+ // Hold write lock here to ensure that dupid is unique.
+ for (; app->duplications.find(dupid) != app->duplications.end();
++dupid) {
+ }
std::string dup_path = get_duplication_path(*app,
std::to_string(dupid));
dup = std::make_shared<duplication_info>(dupid,
@@ -548,8 +638,9 @@ meta_duplication_service::new_dup_from_init(const
std::string &follower_cluster_
app->app_name,
app->partition_count,
dsn_now_ms(),
- follower_cluster_name,
-
std::move(follower_cluster_metas),
+ remote_cluster_name,
+ remote_app_name,
+
std::move(remote_cluster_metas),
std::move(dup_path));
for (int32_t i = 0; i < app->partition_count; i++) {
dup->init_progress(i, invalid_decree);
diff --git a/src/meta/duplication/meta_duplication_service.h
b/src/meta/duplication/meta_duplication_service.h
index 5317d46ec..adb1df7ea 100644
--- a/src/meta/duplication/meta_duplication_service.h
+++ b/src/meta/duplication/meta_duplication_service.h
@@ -80,7 +80,8 @@ public:
private:
void do_add_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
- duplication_add_rpc &rpc);
+ duplication_add_rpc &rpc,
+ const std::string &remote_app_name);
void do_modify_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
@@ -121,8 +122,9 @@ private:
// Create a new duplication from INIT state.
// Thread-Safe
std::shared_ptr<duplication_info>
- new_dup_from_init(const std::string &follower_cluster_name,
- std::vector<host_port> &&follower_cluster_metas,
+ new_dup_from_init(const std::string &remote_cluster_name,
+ const std::string &remote_app_name,
+ std::vector<host_port> &&remote_cluster_metas,
std::shared_ptr<app_state> &app) const;
// get lock to protect access of app table
diff --git a/src/meta/test/duplication_info_test.cpp
b/src/meta/test/duplication_info_test.cpp
index 9e11be637..05f0dbcdd 100644
--- a/src/meta/test/duplication_info_test.cpp
+++ b/src/meta/test/duplication_info_test.cpp
@@ -37,6 +37,11 @@ namespace replication {
class duplication_info_test : public testing::Test
{
public:
+ static const std::string kTestAppName;
+ static const std::string kTestRemoteClusterName;
+ static const std::string kTestRemoteAppName;
+ static const std::string kTestMetaStorePath;
+
void force_update_status(duplication_info &dup, duplication_status::type
status)
{
dup._status = status;
@@ -47,12 +52,13 @@ public:
duplication_info dup(1,
1,
- "temp",
+ kTestAppName,
2,
0,
- "dsn://slave-cluster/temp",
+ kTestRemoteClusterName,
+ kTestRemoteAppName,
std::vector<host_port>(),
- "/meta_test/101/duplication/1");
+ kTestMetaStorePath);
duplication_confirm_entry entry;
ASSERT_FALSE(dup.alter_progress(0, entry));
@@ -100,18 +106,20 @@ public:
{
duplication_info dup(1,
1,
- "temp",
+ kTestAppName,
4,
0,
- "dsn://slave-cluster/temp",
+ kTestRemoteClusterName,
+ kTestRemoteAppName,
std::vector<host_port>(),
- "/meta_test/101/duplication/1");
+ kTestMetaStorePath);
ASSERT_FALSE(dup.is_altering());
- ASSERT_EQ(dup._status, duplication_status::DS_INIT);
- ASSERT_EQ(dup._next_status, duplication_status::DS_INIT);
+ ASSERT_EQ(duplication_status::DS_INIT, dup._status);
+ ASSERT_EQ(duplication_status::DS_INIT, dup._next_status);
auto dup_ent = dup.to_duplication_entry();
- ASSERT_EQ(dup_ent.progress.size(), 0);
+ ASSERT_EQ(0, dup_ent.progress.size());
+ ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name);
for (int i = 0; i < 4; i++) {
dup.init_progress(i, invalid_decree);
@@ -122,20 +130,21 @@ public:
dup.start();
ASSERT_TRUE(dup.is_altering());
- ASSERT_EQ(dup._status, duplication_status::DS_INIT);
- ASSERT_EQ(dup._next_status, duplication_status::DS_PREPARE);
+ ASSERT_EQ(duplication_status::DS_INIT, dup._status);
+ ASSERT_EQ(duplication_status::DS_PREPARE, dup._next_status);
}
static void test_persist_status()
{
duplication_info dup(1,
1,
- "temp",
+ kTestAppName,
4,
0,
- "dsn://slave-cluster/temp",
+ kTestRemoteClusterName,
+ kTestRemoteAppName,
std::vector<host_port>(),
- "/meta_test/101/duplication/1");
+ kTestMetaStorePath);
dup.start();
dup.persist_status();
@@ -149,12 +158,13 @@ public:
dsn_run_config("config-test.ini", false);
duplication_info dup(1,
1,
- "temp",
+ kTestAppName,
4,
0,
- "slave-cluster",
+ kTestRemoteClusterName,
+ kTestRemoteAppName,
std::vector<host_port>(),
- "/meta_test/101/duplication/1");
+ kTestMetaStorePath);
dup.start();
dup.persist_status();
@@ -164,31 +174,38 @@ public:
duplication_info::json_helper copy;
ASSERT_TRUE(json::json_forwarder<duplication_info::json_helper>::decode(json,
copy));
- ASSERT_EQ(copy.status, duplication_status::DS_APP);
- ASSERT_EQ(copy.create_timestamp_ms, dup.create_timestamp_ms);
- ASSERT_EQ(copy.remote, dup.follower_cluster_name);
+ ASSERT_EQ(duplication_status::DS_APP, copy.status);
+ ASSERT_EQ(dup.create_timestamp_ms, copy.create_timestamp_ms);
+ ASSERT_EQ(dup.remote_cluster_name, copy.remote);
+ ASSERT_EQ(dup.remote_app_name, copy.remote_app_name);
- auto dup_sptr = duplication_info::decode_from_blob(
- 1, 1, "temp", 4, "/meta_test/101/duplication/1", json);
+ auto dup_sptr =
+ duplication_info::decode_from_blob(1, 1, kTestAppName, 4,
kTestMetaStorePath, json);
ASSERT_TRUE(dup_sptr->equals_to(dup)) << *dup_sptr << " " << dup;
blob new_json =
blob::create_from_bytes(boost::replace_all_copy(json.to_string(),
"DS_APP", "DS_FOO"));
ASSERT_FALSE(json::json_forwarder<duplication_info::json_helper>::decode(new_json,
copy));
- ASSERT_EQ(copy.status, duplication_status::DS_REMOVED);
+ ASSERT_EQ(duplication_status::DS_REMOVED, copy.status);
}
};
+const std::string duplication_info_test::kTestAppName = "temp";
+const std::string duplication_info_test::kTestRemoteClusterName =
"slave-cluster";
+const std::string duplication_info_test::kTestRemoteAppName = "remote_temp";
+const std::string duplication_info_test::kTestMetaStorePath =
"/meta_test/101/duplication/1";
+
TEST_F(duplication_info_test, alter_status_when_busy)
{
duplication_info dup(1,
1,
- "temp",
+ kTestAppName,
4,
0,
- "dsn://slave-cluster/temp",
+ kTestRemoteClusterName,
+ kTestRemoteAppName,
std::vector<host_port>(),
- "/meta_test/101/duplication/1");
+ kTestMetaStorePath);
dup.start();
ASSERT_EQ(dup.alter_status(duplication_status::DS_PAUSE), ERR_BUSY);
@@ -255,12 +272,13 @@ TEST_F(duplication_info_test, alter_status)
for (auto tt : tests) {
duplication_info dup(1,
1,
- "temp",
+ kTestAppName,
4,
0,
- "dsn://slave-cluster/temp",
+ kTestRemoteClusterName,
+ kTestRemoteAppName,
std::vector<host_port>(),
- "/meta_test/101/duplication/1");
+ kTestMetaStorePath);
for (const auto from : tt.from_list) {
force_update_status(dup, from);
for (const auto to : tt.to_list) {
@@ -285,12 +303,13 @@ TEST_F(duplication_info_test, is_valid)
{
duplication_info dup(1,
1,
- "temp",
+ kTestAppName,
4,
0,
- "dsn://slave-cluster/temp",
+ kTestRemoteClusterName,
+ kTestRemoteAppName,
std::vector<host_port>(),
- "/meta_test/101/duplication/1");
+ kTestMetaStorePath);
ASSERT_TRUE(dup.is_invalid_status());
dup.start();
diff --git a/src/meta/test/meta_duplication_service_test.cpp
b/src/meta/test/meta_duplication_service_test.cpp
index f1435ed37..40570ce09 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -71,15 +71,20 @@ namespace replication {
class meta_duplication_service_test : public meta_test_base
{
public:
+ static const std::string kTestAppName;
+ static const std::string kTestRemoteClusterName;
+ static const std::string kTestRemoteAppName;
+
meta_duplication_service_test() {}
duplication_add_response create_dup(const std::string &app_name,
- const std::string &remote_cluster =
"slave-cluster",
- bool freezed = false)
+ const std::string &remote_cluster,
+ const std::string &remote_app_name)
{
auto req = std::make_unique<duplication_add_request>();
req->app_name = app_name;
req->remote_cluster_name = remote_cluster;
+ req->__set_remote_app_name(remote_app_name);
duplication_add_rpc rpc(std::move(req), RPC_CM_ADD_DUPLICATION);
dup_svc().add_duplication(rpc);
@@ -87,6 +92,17 @@ public:
return rpc.response();
}
+ duplication_add_response create_dup(const std::string &app_name,
+ const std::string &remote_cluster)
+ {
+ return create_dup(app_name, remote_cluster, app_name);
+ }
+
+ duplication_add_response create_dup(const std::string &app_name)
+ {
+ return create_dup(app_name, kTestRemoteClusterName);
+ }
+
duplication_query_response query_dup_info(const std::string &app_name)
{
auto req = std::make_unique<duplication_query_request>();
@@ -177,14 +193,13 @@ public:
void test_new_dup_from_init()
{
- std::string test_app = "test-app";
- create_app(test_app);
- auto app = find_app(test_app);
- std::string remote_cluster_address = "dsn://slave-cluster/temp";
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
int last_dup = 0;
for (int i = 0; i < 1000; i++) {
- auto dup = dup_svc().new_dup_from_init(remote_cluster_address, {},
app);
+ auto dup =
+ dup_svc().new_dup_from_init(kTestRemoteClusterName,
kTestRemoteAppName, {}, app);
ASSERT_GT(dup->id, 0);
ASSERT_FALSE(dup->is_altering());
@@ -268,10 +283,8 @@ public:
TearDown();
SetUp();
- std::string test_app = "test-app";
- create_app(test_app);
- auto app = find_app(test_app);
- std::string remote_cluster_address = "dsn://slave-cluster/temp";
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
std::queue<std::string> q_nodes;
for (auto n : nodes) {
@@ -284,7 +297,7 @@ public:
SetUp();
recover_from_meta_state();
- return find_app(test_app);
+ return find_app(kTestAppName);
}
// Corrupted meta data may result from bad write to meta-store.
@@ -292,9 +305,8 @@ public:
// meta data is corrupted.
void test_recover_from_corrupted_meta_data()
{
- std::string test_app = "test-app";
- create_app(test_app);
- auto app = find_app(test_app);
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
// recover from /<app>/dup
app = mock_test_case_and_recover({_ss->get_app_path(*app),
std::string("dup")}, "");
@@ -316,9 +328,9 @@ public:
// recover from /<app>/duplication/<dup_id>/0, but its
confirmed_decree is not valid integer
TearDown();
SetUp();
- create_app(test_app);
- app = find_app(test_app);
- auto test_dup = create_dup(test_app, "slave-cluster", true);
+ create_app(kTestAppName);
+ app = find_app(kTestAppName);
+ auto test_dup = create_dup(kTestAppName, kTestRemoteClusterName);
ASSERT_EQ(test_dup.err, ERR_OK);
duplication_info_s_ptr dup = app->duplications[test_dup.dupid];
_ms->get_meta_storage()->create_node(meta_duplication_service::get_partition_path(dup,
"0"),
@@ -327,7 +339,7 @@ public:
wait_all();
SetUp();
recover_from_meta_state();
- app = find_app(test_app);
+ app = find_app(kTestAppName);
ASSERT_TRUE(app->duplicating);
ASSERT_EQ(app->duplications.size(), 1);
for (int i = 0; i < app->partition_count; i++) {
@@ -337,9 +349,9 @@ public:
// recover from /<app>/duplication/<dup_id>/x, its pid is not valid
integer
TearDown();
SetUp();
- create_app(test_app);
- app = find_app(test_app);
- test_dup = create_dup(test_app, "slave-cluster", true);
+ create_app(kTestAppName);
+ app = find_app(kTestAppName);
+ test_dup = create_dup(kTestAppName, kTestRemoteClusterName);
ASSERT_EQ(test_dup.err, ERR_OK);
dup = app->duplications[test_dup.dupid];
_ms->get_meta_storage()->create_node(meta_duplication_service::get_partition_path(dup,
"x"),
@@ -357,70 +369,88 @@ public:
void test_add_duplication()
{
- std::string test_app = "test-app";
- std::string test_app_invalid_ver = "test-app-invalid-ver";
+ static const std::string kTestSameAppName(kTestAppName + "_same");
+ static const std::string kTestAnotherAppName(kTestAppName +
"_another");
- std::string invalid_remote = "test-invalid-remote";
- std::string ok_remote = "slave-cluster";
-
- std::string cluster_without_address =
"cluster_without_address_for_test";
-
- create_app(test_app);
-
- create_app(test_app_invalid_ver);
- find_app(test_app_invalid_ver)->envs["value_version"] = "0";
+ create_app(kTestAppName);
+ create_app(kTestSameAppName);
+ create_app(kTestAnotherAppName);
struct TestData
{
- std::string app;
+ std::string app_name;
std::string remote;
+ std::string remote_app_name;
error_code wec;
} tests[] = {
- // {test_app_invalid_ver, ok_remote, ERR_INVALID_VERSION},
-
- {test_app, ok_remote, ERR_OK},
-
- {test_app, invalid_remote, ERR_INVALID_PARAMETERS},
-
- {test_app, get_current_cluster_name(), ERR_INVALID_PARAMETERS},
-
- {test_app, cluster_without_address, ERR_INVALID_PARAMETERS},
+ // The general case that duplicating to remote cluster with
specified remote_app_name.
+ {kTestAppName, kTestRemoteClusterName, kTestRemoteAppName, ERR_OK},
+ // A duplication that has been added would be found with its
original remote_app_name.
+ {kTestAppName, kTestRemoteClusterName, kTestRemoteAppName, ERR_OK},
+ // The general case that duplicating to remote cluster with same
remote_app_name.
+ {kTestSameAppName, kTestRemoteClusterName, kTestSameAppName,
ERR_OK},
+ // It is not allowed that remote_cluster_name does not exist in
"duplication-group".
+ {kTestAppName, "test-invalid-remote", kTestRemoteAppName,
ERR_INVALID_PARAMETERS},
+ // Duplicating to local cluster is not allowed.
+ {kTestAppName, get_current_cluster_name(), kTestRemoteAppName,
ERR_INVALID_PARAMETERS},
+ // It is not allowed that remote_cluster_name exists in
"duplication-group" but not
+ // exists in "pegasus.clusters".
+ {kTestAppName,
+ "cluster_without_address_for_test",
+ kTestRemoteAppName,
+ ERR_INVALID_PARAMETERS},
+ // The attempt that duplicates another app to the same remote app
would be blocked.
+ {kTestAnotherAppName,
+ kTestRemoteClusterName,
+ kTestRemoteAppName,
+ ERR_INVALID_PARAMETERS},
+ // The attempt that duplicates another app to the different remote
app would be
+ // ok.
+ {kTestAnotherAppName, kTestRemoteClusterName, kTestAppName,
ERR_OK},
};
- for (auto tt : tests) {
- auto resp = create_dup(tt.app, tt.remote);
- ASSERT_EQ(tt.wec, resp.err);
-
- if (tt.wec == ERR_OK) {
- auto app = find_app(test_app);
- auto dup = app->duplications[resp.dupid];
- ASSERT_TRUE(dup != nullptr);
- ASSERT_EQ(dup->app_id, app->app_id);
- ASSERT_EQ(dup->_status, duplication_status::DS_PREPARE);
- ASSERT_EQ(dup->follower_cluster_name, ok_remote);
- ASSERT_EQ(resp.dupid, dup->id);
- ASSERT_EQ(app->duplicating, true);
+ for (auto test : tests) {
+ auto resp = create_dup(test.app_name, test.remote,
test.remote_app_name);
+ ASSERT_EQ(test.wec, resp.err);
+
+ if (test.wec != ERR_OK) {
+ continue;
}
+
+ auto app = find_app(test.app_name);
+ auto dup = app->duplications[resp.dupid];
+ ASSERT_TRUE(dup != nullptr);
+ ASSERT_EQ(app->app_id, dup->app_id);
+ ASSERT_EQ(duplication_status::DS_PREPARE, dup->_status);
+ ASSERT_EQ(test.remote, dup->remote_cluster_name);
+ ASSERT_EQ(test.remote_app_name, resp.remote_app_name);
+ ASSERT_EQ(test.remote_app_name, dup->remote_app_name);
+ ASSERT_EQ(resp.dupid, dup->id);
+ ASSERT_TRUE(app->duplicating);
}
}
};
+const std::string meta_duplication_service_test::kTestAppName = "test_app";
+const std::string meta_duplication_service_test::kTestRemoteClusterName =
"slave-cluster";
+const std::string meta_duplication_service_test::kTestRemoteAppName =
"remote_test_app";
+
// This test ensures that duplication upon an unavailable app will
// be rejected with ERR_APP_NOT_EXIST.
TEST_F(meta_duplication_service_test, dup_op_upon_unavail_app)
{
- std::string test_app = "test-app";
- std::string test_app_not_exist = "test-app-not-exists";
- std::string test_app_unavail = "test-app-unavail";
+ const std::string test_app_not_exist = "test_app_not_exists";
+ const std::string test_app_unavail = "test_app_unavail";
- create_app(test_app);
- auto app = find_app(test_app);
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
+ ASSERT_EQ(kTestAppName, app->app_name);
create_app(test_app_unavail);
find_app(test_app_unavail)->status = app_status::AS_DROPPED;
- dupid_t test_dup = create_dup(test_app).dupid;
+ dupid_t test_dup = create_dup(kTestAppName).dupid;
struct TestData
{
@@ -431,13 +461,14 @@ TEST_F(meta_duplication_service_test,
dup_op_upon_unavail_app)
{test_app_not_exist, ERR_APP_NOT_EXIST},
{test_app_unavail, ERR_APP_NOT_EXIST},
- {test_app, ERR_OK},
+ {kTestAppName, ERR_OK},
};
- for (auto tt : tests) {
- ASSERT_EQ(query_dup_info(tt.app).err, tt.wec);
- ASSERT_EQ(create_dup(tt.app).err, tt.wec);
- ASSERT_EQ(change_dup_status(tt.app, test_dup,
duplication_status::DS_REMOVED).err, tt.wec);
+ for (auto test : tests) {
+ ASSERT_EQ(test.wec, query_dup_info(test.app).err);
+ ASSERT_EQ(test.wec, create_dup(test.app).err);
+ ASSERT_EQ(test.wec,
+ change_dup_status(test.app, test_dup,
duplication_status::DS_REMOVED).err);
}
}
@@ -447,17 +478,16 @@ TEST_F(meta_duplication_service_test, add_duplication) {
test_add_duplication();
// if there's already one existed.
TEST_F(meta_duplication_service_test, dont_create_if_existed)
{
- std::string test_app = "test-app";
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
+ ASSERT_EQ(kTestAppName, app->app_name);
- create_app(test_app);
- auto app = find_app(test_app);
-
- create_dup(test_app);
- create_dup(test_app);
- dupid_t dupid = create_dup(test_app).dupid;
+ create_dup(kTestAppName);
+ create_dup(kTestAppName);
+ dupid_t dupid = create_dup(kTestAppName).dupid;
{
- auto resp = query_dup_info(test_app);
+ auto resp = query_dup_info(kTestAppName);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(resp.entry_list.size(), 1);
@@ -467,15 +497,32 @@ TEST_F(meta_duplication_service_test,
dont_create_if_existed)
}
}
-TEST_F(meta_duplication_service_test, change_duplication_status)
+TEST_F(meta_duplication_service_test, add_dup_with_remote_app_name)
{
- std::string test_app = "test-app";
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
+ ASSERT_EQ(kTestAppName, app->app_name);
- create_app(test_app);
- auto app = find_app(test_app);
- dupid_t test_dup = create_dup(test_app).dupid;
- change_dup_status(test_app, test_dup, duplication_status::DS_APP);
- change_dup_status(test_app, test_dup, duplication_status::DS_LOG);
+ create_dup(kTestAppName, kTestRemoteClusterName, kTestRemoteAppName);
+ const dupid_t dupid = create_dup(kTestAppName).dupid;
+
+ const auto &resp = query_dup_info(kTestAppName);
+ ASSERT_EQ(ERR_OK, resp.err);
+ ASSERT_EQ(1, resp.entry_list.size());
+
+ const auto &duplication_entry = resp.entry_list.back();
+ ASSERT_EQ(dupid, duplication_entry.dupid);
+ ASSERT_EQ(duplication_status::DS_PREPARE, duplication_entry.status);
+ ASSERT_EQ(kTestRemoteAppName, duplication_entry.remote_app_name);
+}
+
+TEST_F(meta_duplication_service_test, change_duplication_status)
+{
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
+ dupid_t test_dup = create_dup(kTestAppName).dupid;
+ change_dup_status(kTestAppName, test_dup, duplication_status::DS_APP);
+ change_dup_status(kTestAppName, test_dup, duplication_status::DS_LOG);
struct TestData
{
@@ -485,18 +532,18 @@ TEST_F(meta_duplication_service_test,
change_duplication_status)
error_code wec;
} tests[] = {
- {test_app, test_dup + 1, duplication_status::DS_REMOVED,
ERR_OBJECT_NOT_FOUND},
+ {kTestAppName, test_dup + 1, duplication_status::DS_REMOVED,
ERR_OBJECT_NOT_FOUND},
// ok test
- {test_app, test_dup, duplication_status::DS_PAUSE, ERR_OK}, //
start->pause
- {test_app, test_dup, duplication_status::DS_PAUSE, ERR_OK}, //
pause->pause
- {test_app, test_dup, duplication_status::DS_LOG, ERR_OK}, //
pause->start
- {test_app, test_dup, duplication_status::DS_LOG, ERR_OK}, //
start->start
+ {kTestAppName, test_dup, duplication_status::DS_PAUSE, ERR_OK}, //
start->pause
+ {kTestAppName, test_dup, duplication_status::DS_PAUSE, ERR_OK}, //
pause->pause
+ {kTestAppName, test_dup, duplication_status::DS_LOG, ERR_OK}, //
pause->start
+ {kTestAppName, test_dup, duplication_status::DS_LOG, ERR_OK}, //
start->start
};
- for (auto tt : tests) {
- auto resp = change_dup_status(tt.app, tt.dupid, tt.status);
- ASSERT_EQ(resp.err, tt.wec);
+ for (auto test : tests) {
+ auto resp = change_dup_status(test.app, test.dupid, test.status);
+ ASSERT_EQ(test.wec, resp.err);
}
}
@@ -505,18 +552,17 @@ TEST_F(meta_duplication_service_test, new_dup_from_init)
{ test_new_dup_from_ini
TEST_F(meta_duplication_service_test, remove_dup)
{
- std::string test_app = "test-app";
- create_app(test_app);
- auto app = find_app(test_app);
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
- auto resp = create_dup(test_app);
+ auto resp = create_dup(kTestAppName);
ASSERT_EQ(ERR_OK, resp.err);
dupid_t dupid1 = resp.dupid;
ASSERT_EQ(app->duplicating, true);
auto dup = app->duplications.find(dupid1)->second;
- auto resp2 = change_dup_status(test_app, dupid1,
duplication_status::DS_REMOVED);
+ auto resp2 = change_dup_status(kTestAppName, dupid1,
duplication_status::DS_REMOVED);
ASSERT_EQ(ERR_OK, resp2.err);
// though this duplication is unreferenced, its status still updated
correctly
ASSERT_EQ(dup->status(), duplication_status::DS_REMOVED);
@@ -544,7 +590,7 @@ TEST_F(meta_duplication_service_test, duplication_sync)
const auto &node = server_nodes[0];
const auto &addr =
dsn::dns_resolver::instance().resolve_address(server_nodes[0]);
- std::string test_app = "test_app_0";
+ const std::string test_app = "test_app_0";
create_app(test_app);
auto app = find_app(test_app);
@@ -559,7 +605,7 @@ TEST_F(meta_duplication_service_test, duplication_sync)
initialize_node_state();
- dupid_t dupid = create_dup(test_app).dupid;
+ const dupid_t dupid = create_dup(test_app).dupid;
auto dup = app->duplications[dupid];
for (int i = 0; i < app->partition_count; i++) {
dup->init_progress(i, invalid_decree);
@@ -586,7 +632,7 @@ TEST_F(meta_duplication_service_test, duplication_sync)
ASSERT_EQ(resp.dup_map[app->app_id][dupid].dupid, dupid);
ASSERT_EQ(resp.dup_map[app->app_id][dupid].status,
duplication_status::DS_PREPARE);
ASSERT_EQ(resp.dup_map[app->app_id][dupid].create_ts,
dup->create_timestamp_ms);
- ASSERT_EQ(resp.dup_map[app->app_id][dupid].remote,
dup->follower_cluster_name);
+ ASSERT_EQ(resp.dup_map[app->app_id][dupid].remote,
dup->remote_cluster_name);
ASSERT_EQ(resp.dup_map[app->app_id][dupid].fail_mode,
dup->fail_mode());
auto progress_map = resp.dup_map[app->app_id][dupid].progress;
@@ -652,44 +698,40 @@ TEST_F(meta_duplication_service_test,
recover_from_meta_state) { test_recover_fr
TEST_F(meta_duplication_service_test, query_duplication_info)
{
- std::string test_app = "test-app";
-
- create_app(test_app);
- auto app = find_app(test_app);
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
- dupid_t test_dup = create_dup(test_app).dupid;
- change_dup_status(test_app, test_dup, duplication_status::DS_PAUSE);
+ dupid_t test_dup = create_dup(kTestAppName).dupid;
+ change_dup_status(kTestAppName, test_dup, duplication_status::DS_PAUSE);
- auto resp = query_dup_info(test_app);
+ auto resp = query_dup_info(kTestAppName);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(resp.entry_list.size(), 1);
ASSERT_EQ(resp.entry_list.back().status, duplication_status::DS_PREPARE);
ASSERT_EQ(resp.entry_list.back().dupid, test_dup);
ASSERT_EQ(resp.appid, app->app_id);
- change_dup_status(test_app, test_dup, duplication_status::DS_REMOVED);
- resp = query_dup_info(test_app);
+ change_dup_status(kTestAppName, test_dup, duplication_status::DS_REMOVED);
+ resp = query_dup_info(kTestAppName);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(resp.entry_list.size(), 0);
}
TEST_F(meta_duplication_service_test, re_add_duplication)
{
- std::string test_app = "test-app";
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
- create_app(test_app);
- auto app = find_app(test_app);
-
- auto test_dup = create_dup(test_app);
+ auto test_dup = create_dup(kTestAppName);
ASSERT_EQ(test_dup.err, ERR_OK);
ASSERT_TRUE(app->duplications[test_dup.dupid] != nullptr);
- auto resp = change_dup_status(test_app, test_dup.dupid,
duplication_status::DS_REMOVED);
+ auto resp = change_dup_status(kTestAppName, test_dup.dupid,
duplication_status::DS_REMOVED);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_TRUE(app->duplications.find(test_dup.dupid) ==
app->duplications.end());
sleep(1);
- auto test_dup_2 = create_dup(test_app);
+ auto test_dup_2 = create_dup(kTestAppName);
ASSERT_EQ(test_dup_2.appid, app->app_id);
ASSERT_EQ(test_dup_2.err, ERR_OK);
@@ -697,7 +739,7 @@ TEST_F(meta_duplication_service_test, re_add_duplication)
ASSERT_EQ(app->duplications.size(), 1);
ASSERT_NE(test_dup.dupid, test_dup_2.dupid);
- auto dup_list = query_dup_info(test_app).entry_list;
+ auto dup_list = query_dup_info(kTestAppName).entry_list;
ASSERT_EQ(dup_list.size(), 1);
ASSERT_EQ(dup_list.begin()->status, duplication_status::DS_PREPARE);
ASSERT_EQ(dup_list.begin()->dupid, test_dup_2.dupid);
@@ -706,7 +748,7 @@ TEST_F(meta_duplication_service_test, re_add_duplication)
SetUp();
recover_from_meta_state();
- app = find_app(test_app);
+ app = find_app(kTestAppName);
ASSERT_TRUE(app->duplications.find(test_dup.dupid) ==
app->duplications.end());
ASSERT_EQ(app->duplications.size(), 1);
}
@@ -718,60 +760,58 @@ TEST_F(meta_duplication_service_test,
recover_from_corrupted_meta_data)
TEST_F(meta_duplication_service_test, query_duplication_handler)
{
- std::string test_app = "test-app";
- create_app(test_app);
- create_dup(test_app);
+ create_app(kTestAppName);
+ create_dup(kTestAppName, kTestRemoteClusterName, kTestRemoteAppName);
meta_http_service mhs(_ms.get());
http_request fake_req;
http_response fake_resp;
- fake_req.query_args["name"] = test_app + "not-found";
+ fake_req.query_args["name"] = kTestAppName + "not_found";
mhs.query_duplication_handler(fake_req, fake_resp);
ASSERT_EQ(fake_resp.status_code, http_status_code::kNotFound);
- const auto &duplications = find_app(test_app)->duplications;
- ASSERT_EQ(duplications.size(), 1);
+ const auto &duplications = find_app(kTestAppName)->duplications;
+ ASSERT_EQ(1, duplications.size());
auto dup = duplications.begin()->second;
- fake_req.query_args["name"] = test_app;
+ fake_req.query_args["name"] = kTestAppName;
mhs.query_duplication_handler(fake_req, fake_resp);
- ASSERT_EQ(fake_resp.status_code, http_status_code::kOk);
+ ASSERT_EQ(http_status_code::kOk, fake_resp.status_code);
char ts_buf[32];
utils::time_ms_to_date_time(
static_cast<uint64_t>(dup->create_timestamp_ms), ts_buf,
sizeof(ts_buf));
- ASSERT_EQ(fake_resp.body,
- std::string() + R"({"1":{"create_ts":")" + ts_buf +
R"(","dupid":)" +
+ ASSERT_EQ(std::string() + R"({"1":{"create_ts":")" + ts_buf +
R"(","dupid":)" +
std::to_string(dup->id) +
- R"(,"fail_mode":"FAIL_SLOW")"
-
R"(,"remote":"slave-cluster","status":"DS_PREPARE"},"appid":2})");
+ R"(,"fail_mode":"FAIL_SLOW","remote":"slave-cluster")"
+
R"(,"remote_app_name":"remote_test_app","status":"DS_PREPARE"},"appid":2})",
+ fake_resp.body);
}
TEST_F(meta_duplication_service_test, fail_mode)
{
- std::string test_app = "test-app";
- create_app(test_app);
- auto app = find_app(test_app);
+ create_app(kTestAppName);
+ auto app = find_app(kTestAppName);
- auto dup_add_resp = create_dup(test_app);
+ auto dup_add_resp = create_dup(kTestAppName);
auto dup = app->duplications[dup_add_resp.dupid];
ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SLOW);
ASSERT_EQ(dup->status(), duplication_status::DS_PREPARE);
- auto resp = update_fail_mode(test_app, dup->id,
duplication_fail_mode::FAIL_SKIP);
+ auto resp = update_fail_mode(kTestAppName, dup->id,
duplication_fail_mode::FAIL_SKIP);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SKIP);
ASSERT_EQ(dup->status(), duplication_status::DS_PREPARE);
// change nothing
- resp = update_fail_mode(test_app, dup->id,
duplication_fail_mode::FAIL_SKIP);
+ resp = update_fail_mode(kTestAppName, dup->id,
duplication_fail_mode::FAIL_SKIP);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SKIP);
ASSERT_EQ(dup->status(), duplication_status::DS_PREPARE);
// change status but fail mode not changed
- change_dup_status(test_app, dup->id, duplication_status::DS_APP);
- change_dup_status(test_app, dup->id, duplication_status::DS_LOG);
- resp = change_dup_status(test_app, dup->id, duplication_status::DS_PAUSE);
+ change_dup_status(kTestAppName, dup->id, duplication_status::DS_APP);
+ change_dup_status(kTestAppName, dup->id, duplication_status::DS_LOG);
+ resp = change_dup_status(kTestAppName, dup->id,
duplication_status::DS_PAUSE);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SKIP);
ASSERT_EQ(dup->status(), duplication_status::DS_PAUSE);
@@ -790,7 +830,7 @@ TEST_F(meta_duplication_service_test, fail_mode)
// ensure recovery will not lose fail_mode.
SetUp();
recover_from_meta_state();
- app = find_app(test_app);
+ app = find_app(kTestAppName);
dup = app->duplications[dup->id];
ASSERT_EQ(dup->fail_mode(), duplication_fail_mode::FAIL_SKIP);
}
@@ -823,7 +863,7 @@ TEST_F(meta_duplication_service_test,
create_follower_app_for_duplication)
duplication_status::DS_APP}};
for (const auto &test : test_cases) {
- std::string test_app = test.fail_cfg_name;
+ const auto test_app = test.fail_cfg_name;
create_app(test_app);
auto app = find_app(test_app);
@@ -869,8 +909,7 @@ TEST_F(meta_duplication_service_test,
check_follower_app_if_create_completed)
duplication_status::DS_LOG}};
for (const auto &test : test_cases) {
- std::string test_app =
- fmt::format("{}{}", test.fail_cfg_name[0],
test.fail_cfg_name.size());
+ const auto test_app = fmt::format("{}{}", test.fail_cfg_name[0],
test.fail_cfg_name.size());
create_app(test_app);
auto app = find_app(test_app);
diff --git a/src/replica/duplication/duplication_pipeline.cpp
b/src/replica/duplication/duplication_pipeline.cpp
index 8ce736f51..54abd83cf 100644
--- a/src/replica/duplication/duplication_pipeline.cpp
+++ b/src/replica/duplication/duplication_pipeline.cpp
@@ -23,7 +23,6 @@
#include <string>
#include <utility>
-#include "dsn.layer2_types.h"
#include "load_from_private_log.h"
#include "replica/duplication/replica_duplicator.h"
#include "replica/mutation_log.h"
@@ -122,7 +121,7 @@ ship_mutation::ship_mutation(replica_duplicator *duplicator)
METRIC_VAR_INIT_replica(dup_shipped_bytes)
{
_mutation_duplicator = new_mutation_duplicator(
- duplicator, _duplicator->remote_cluster_name(),
_replica->get_app_info()->app_name);
+ duplicator, duplicator->remote_cluster_name(),
duplicator->remote_app_name());
_mutation_duplicator->set_task_environment(duplicator);
}
diff --git a/src/replica/duplication/duplication_sync_timer.cpp
b/src/replica/duplication/duplication_sync_timer.cpp
index 8b9adce15..98137d21f 100644
--- a/src/replica/duplication/duplication_sync_timer.cpp
+++ b/src/replica/duplication/duplication_sync_timer.cpp
@@ -206,6 +206,7 @@ duplication_sync_timer::get_dup_states(int app_id, /*out*/
bool *app_found)
state.not_confirmed = std::max(decree(0), last_committed_decree -
s.confirmed_decree);
state.not_duplicated = std::max(decree(0), last_committed_decree -
s.last_decree);
state.fail_mode = s.fail_mode;
+ state.remote_app_name = s.remote_app_name;
result.emplace(std::make_pair(s.dupid, state));
}
}
diff --git a/src/replica/duplication/duplication_sync_timer.h
b/src/replica/duplication/duplication_sync_timer.h
index ff1b6c066..48fe2ec95 100644
--- a/src/replica/duplication/duplication_sync_timer.h
+++ b/src/replica/duplication/duplication_sync_timer.h
@@ -18,6 +18,7 @@
#pragma once
#include <map>
+#include <string>
#include <vector>
#include "common//duplication_common.h"
@@ -53,6 +54,7 @@ public:
decree not_duplicated{0};
decree not_confirmed{0};
duplication_fail_mode::type
fail_mode{duplication_fail_mode::FAIL_SLOW};
+ std::string remote_app_name;
};
std::multimap<dupid_t, replica_dup_state> get_dup_states(int app_id,
/*out*/ bool *app_found);
diff --git a/src/replica/duplication/replica_duplicator.cpp
b/src/replica/duplication/replica_duplicator.cpp
index 10e396446..810209651 100644
--- a/src/replica/duplication/replica_duplicator.cpp
+++ b/src/replica/duplication/replica_duplicator.cpp
@@ -52,6 +52,10 @@ replica_duplicator::replica_duplicator(const
duplication_entry &ent, replica *r)
: replica_base(r),
_id(ent.dupid),
_remote_cluster_name(ent.remote),
+ // remote_app_name is missing means meta server is of old version(<
v2.6.0),
+ // in which case source app_name would be used as remote_app_name.
+ _remote_app_name(ent.__isset.remote_app_name ? ent.remote_app_name
+ :
r->get_app_info()->app_name),
_replica(r),
_stub(r->get_replica_stub()),
METRIC_VAR_INIT_replica(dup_confirmed_mutations)
diff --git a/src/replica/duplication/replica_duplicator.h
b/src/replica/duplication/replica_duplicator.h
index 78e649784..ebf4473b9 100644
--- a/src/replica/duplication/replica_duplicator.h
+++ b/src/replica/duplication/replica_duplicator.h
@@ -102,6 +102,8 @@ public:
const std::string &remote_cluster_name() const { return
_remote_cluster_name; }
+ const std::string &remote_app_name() const { return _remote_app_name; }
+
// Thread-safe
duplication_progress progress() const
{
@@ -155,6 +157,7 @@ private:
const dupid_t _id;
const std::string _remote_cluster_name;
+ const std::string _remote_app_name;
replica *_replica;
replica_stub *_stub;
diff --git a/src/replica/duplication/replica_duplicator_manager.cpp
b/src/replica/duplication/replica_duplicator_manager.cpp
index 03ca07112..d60bf57b2 100644
--- a/src/replica/duplication/replica_duplicator_manager.cpp
+++ b/src/replica/duplication/replica_duplicator_manager.cpp
@@ -185,6 +185,7 @@ replica_duplicator_manager::get_dup_states() const
state.last_decree = progress.last_decree;
state.confirmed_decree = progress.confirmed_decree;
state.fail_mode = dup.second->fail_mode();
+ state.remote_app_name = dup.second->remote_app_name();
ret.emplace_back(state);
}
return ret;
diff --git a/src/replica/duplication/replica_duplicator_manager.h
b/src/replica/duplication/replica_duplicator_manager.h
index f2b1593e4..55fc92313 100644
--- a/src/replica/duplication/replica_duplicator_manager.h
+++ b/src/replica/duplication/replica_duplicator_manager.h
@@ -19,6 +19,7 @@
#include <stdint.h>
#include <map>
+#include <string>
#include <utility>
#include <vector>
@@ -88,6 +89,7 @@ public:
decree last_decree{invalid_decree};
decree confirmed_decree{invalid_decree};
duplication_fail_mode::type
fail_mode{duplication_fail_mode::FAIL_SLOW};
+ std::string remote_app_name;
};
std::vector<dup_state> get_dup_states() const;
diff --git a/src/replica/duplication/replica_follower.cpp
b/src/replica/duplication/replica_follower.cpp
index c9c5e317d..71072ab75 100644
--- a/src/replica/duplication/replica_follower.cpp
+++ b/src/replica/duplication/replica_follower.cpp
@@ -60,21 +60,29 @@ void replica_follower::init_master_info()
{
const auto &envs = _replica->get_app_info()->envs;
- if (envs.find(duplication_constants::kDuplicationEnvMasterClusterKey) ==
envs.end() ||
- envs.find(duplication_constants::kDuplicationEnvMasterMetasKey) ==
envs.end()) {
+ const auto &cluster_name =
envs.find(duplication_constants::kDuplicationEnvMasterClusterKey);
+ const auto &metas =
envs.find(duplication_constants::kDuplicationEnvMasterMetasKey);
+ if (cluster_name == envs.end() || metas == envs.end()) {
return;
}
need_duplicate = true;
- _master_cluster_name =
envs.at(duplication_constants::kDuplicationEnvMasterClusterKey);
- _master_app_name = _replica->get_app_info()->app_name;
+ _master_cluster_name = cluster_name->second;
- const auto &meta_list_str =
envs.at(duplication_constants::kDuplicationEnvMasterMetasKey);
- std::vector<std::string> metas;
- dsn::utils::split_args(meta_list_str.c_str(), metas, ',');
- CHECK(!metas.empty(), "master cluster meta list is invalid!");
- for (const auto &meta : metas) {
+ const auto &app_name =
envs.find(duplication_constants::kDuplicationEnvMasterAppNameKey);
+ if (app_name == envs.end()) {
+ // The version of meta server of master cluster is old(< v2.6.0), thus
the app name of
+ // the follower cluster is the same with master cluster.
+ _master_app_name = _replica->get_app_info()->app_name;
+ } else {
+ _master_app_name = app_name->second;
+ }
+
+ std::vector<std::string> master_metas;
+ dsn::utils::split_args(metas->second.c_str(), master_metas, ',');
+ CHECK(!master_metas.empty(), "master cluster meta list is invalid!");
+ for (const auto &meta : master_metas) {
const auto node = host_port::from_string(meta);
CHECK(!node.is_invalid(), "{} is invalid meta host_port", meta);
_master_meta_list.emplace_back(std::move(node));
diff --git a/src/replica/duplication/test/dup_replica_http_service_test.cpp
b/src/replica/duplication/test/dup_replica_http_service_test.cpp
index b8c58479e..43a0a3515 100644
--- a/src/replica/duplication/test/dup_replica_http_service_test.cpp
+++ b/src/replica/duplication/test/dup_replica_http_service_test.cpp
@@ -49,6 +49,7 @@ TEST_P(dup_replica_http_service_test,
query_duplication_handler)
ent.dupid = 1583306653;
ent.progress[pri->get_gpid().get_partition_index()] = 0;
ent.status = duplication_status::DS_PAUSE;
+ ent.__set_remote_app_name("temp");
add_dup(pri, std::make_unique<replica_duplicator>(ent, pri));
replica_http_service http_svc(stub.get());
@@ -73,7 +74,7 @@ TEST_P(dup_replica_http_service_test,
query_duplication_handler)
http_svc.query_duplication_handler(req, resp);
ASSERT_EQ(resp.status_code, http_status_code::kOk);
ASSERT_EQ(
-
R"({"1583306653":{"1.1":{"duplicating":false,"fail_mode":"FAIL_SLOW","not_confirmed_mutations_num":100,"not_duplicated_mutations_num":50}}})",
+
R"({"1583306653":{"1.1":{"duplicating":false,"fail_mode":"FAIL_SLOW","not_confirmed_mutations_num":100,"not_duplicated_mutations_num":50,"remote_app_name":"temp"}}})",
resp.body);
}
diff --git a/src/replica/duplication/test/duplication_sync_timer_test.cpp
b/src/replica/duplication/test/duplication_sync_timer_test.cpp
index 8889598f4..c02629e41 100644
--- a/src/replica/duplication/test/duplication_sync_timer_test.cpp
+++ b/src/replica/duplication/test/duplication_sync_timer_test.cpp
@@ -46,6 +46,9 @@ public:
void test_on_duplication_sync_reply()
{
+ static const std::string kTestRemoteClusterName = "slave-cluster";
+ static const std::string kTestRemoteAppName = "temp";
+
// replica: {app_id:2, partition_id:1, duplications:{}}
stub->add_primary_replica(2, 1);
ASSERT_NE(stub->find_replica(2, 1), nullptr);
@@ -53,7 +56,8 @@ public:
// appid:2 -> dupid:1
duplication_entry ent;
ent.dupid = 1;
- ent.remote = "slave-cluster";
+ ent.remote = kTestRemoteClusterName;
+ ent.__set_remote_app_name(kTestRemoteAppName);
ent.status = duplication_status::DS_PAUSE;
ent.progress[1] = 1000; // partition 1 => confirmed 1000
duplication_sync_response resp;
@@ -65,9 +69,11 @@ public:
stub->find_replica(2,
1)->get_replica_duplicator_manager()._duplications[1].get();
ASSERT_TRUE(dup);
- ASSERT_EQ(dup->_status, duplication_status::DS_PAUSE);
- ASSERT_EQ(dup->_progress.confirmed_decree, 1000);
- ASSERT_EQ(dup_sync->_rpc_task, nullptr);
+ ASSERT_EQ(kTestRemoteClusterName, dup->_remote_cluster_name);
+ ASSERT_EQ(kTestRemoteAppName, dup->_remote_app_name);
+ ASSERT_EQ(duplication_status::DS_PAUSE, dup->_status);
+ ASSERT_EQ(1000, dup->_progress.confirmed_decree);
+ ASSERT_EQ(nullptr, dup_sync->_rpc_task);
}
void test_duplication_sync()
@@ -90,16 +96,16 @@ public:
{
// replica server should not sync to meta when it's
disconnected
dup_sync->run();
- ASSERT_EQ(duplication_sync_rpc::mail_box().size(), 0);
+ ASSERT_EQ(0, duplication_sync_rpc::mail_box().size());
}
{
// never collects confirm points from non-primaries
stub->set_state_connected();
dup_sync->run();
- ASSERT_EQ(duplication_sync_rpc::mail_box().size(), 1);
+ ASSERT_EQ(1, duplication_sync_rpc::mail_box().size());
auto &req = duplication_sync_rpc::mail_box().back().request();
- ASSERT_EQ(req.confirm_list.size(), 0);
+ ASSERT_EQ(0, req.confirm_list.size());
}
}
@@ -115,7 +121,7 @@ public:
ASSERT_EQ(req.node, stub->primary_address());
// ensure confirm list is empty when no progress
- ASSERT_EQ(req.confirm_list.size(), 0);
+ ASSERT_EQ(0, req.confirm_list.size());
// ensure this rpc has timeout set.
auto &rpc = duplication_sync_rpc::mail_box().back();
@@ -142,11 +148,11 @@ public:
ASSERT_TRUE(req.confirm_list.find(gpid(appid, 1)) !=
req.confirm_list.end());
auto dup_list = req.confirm_list[gpid(appid, 1)];
- ASSERT_EQ(dup_list.size(), 1);
+ ASSERT_EQ(1, dup_list.size());
auto dup = dup_list[0];
- ASSERT_EQ(dup.dupid, 1);
- ASSERT_EQ(dup.confirmed_decree, 1500);
+ ASSERT_EQ(1, dup.dupid);
+ ASSERT_EQ(1500, dup.confirmed_decree);
}
}
}
@@ -198,10 +204,10 @@ public:
{
stub->set_state_connected();
dup_sync->run();
- ASSERT_EQ(duplication_sync_rpc::mail_box().size(), 1);
+ ASSERT_EQ(1, duplication_sync_rpc::mail_box().size());
auto &req = duplication_sync_rpc::mail_box().back().request();
- ASSERT_EQ(req.confirm_list.size(), 3);
+ ASSERT_EQ(3, req.confirm_list.size());
ASSERT_TRUE(req.confirm_list.find(gpid(1, 1)) !=
req.confirm_list.end());
ASSERT_TRUE(req.confirm_list.find(gpid(3, 1)) !=
req.confirm_list.end());
@@ -275,7 +281,7 @@ public:
auto r = stub->find_replica(appid, 1);
auto dup = find_dup(r, 1);
- ASSERT_EQ(dup->progress().confirmed_decree, 3);
+ ASSERT_EQ(3, dup->progress().confirmed_decree);
}
}
@@ -300,8 +306,8 @@ public:
dup_sync->update_duplication_map(dup_map);
for (int partition_id = 0; partition_id < 10; partition_id++) {
- ASSERT_NE(find_dup(stub->find_replica(1, partition_id), 2),
nullptr) << partition_id;
- ASSERT_EQ(find_dup(stub->find_replica(1, partition_id), 2)->id(),
2);
+ ASSERT_NE(nullptr, find_dup(stub->find_replica(1, partition_id),
2)) << partition_id;
+ ASSERT_EQ(2, find_dup(stub->find_replica(1, partition_id),
2)->id());
}
// primary -> secondary
@@ -321,7 +327,7 @@ public:
}
dup_sync->update_duplication_map(dup_map);
for (int partition_id = 0; partition_id < 10; partition_id++) {
- ASSERT_EQ(find_dup(stub->find_replica(1, partition_id), 2)->id(),
2);
+ ASSERT_EQ(2, find_dup(stub->find_replica(1, partition_id),
2)->id());
}
// on meta's perspective, only 3 partitions are hosted on this server
@@ -332,7 +338,7 @@ public:
dup_map[appid][ent.dupid] = ent;
dup_sync->update_duplication_map(dup_map);
for (int partition_id = 0; partition_id < 3; partition_id++) {
- ASSERT_EQ(find_dup(stub->find_replica(1, partition_id), 2)->id(),
2);
+ ASSERT_EQ(2, find_dup(stub->find_replica(1, partition_id),
2)->id());
}
for (int partition_id = 3; partition_id < 10; partition_id++) {
ASSERT_TRUE(stub->find_replica(1, partition_id)
diff --git a/src/replica/duplication/test/replica_duplicator_test.cpp
b/src/replica/duplication/test/replica_duplicator_test.cpp
index 20692b0c2..817e3090f 100644
--- a/src/replica/duplication/test/replica_duplicator_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_test.cpp
@@ -69,25 +69,29 @@ public:
return dup->_start_point_decree;
}
- void test_new_duplicator()
+ void test_new_duplicator(const std::string &remote_app_name, bool
specify_remote_app_name)
{
- dupid_t dupid = 1;
- std::string remote = "remote_address";
- duplication_status::type status = duplication_status::DS_PAUSE;
- int64_t confirmed_decree = 100;
+ const dupid_t dupid = 1;
+ const std::string remote = "remote_address";
+ const duplication_status::type status = duplication_status::DS_PAUSE;
+ const int64_t confirmed_decree = 100;
duplication_entry dup_ent;
dup_ent.dupid = dupid;
dup_ent.remote = remote;
dup_ent.status = status;
dup_ent.progress[_replica->get_gpid().get_partition_index()] =
confirmed_decree;
+ if (specify_remote_app_name) {
+ dup_ent.__set_remote_app_name(remote_app_name);
+ }
auto duplicator = std::make_unique<replica_duplicator>(dup_ent,
_replica.get());
- ASSERT_EQ(duplicator->id(), dupid);
- ASSERT_EQ(duplicator->remote_cluster_name(), remote);
- ASSERT_EQ(duplicator->_status, status);
- ASSERT_EQ(duplicator->progress().confirmed_decree, confirmed_decree);
- ASSERT_EQ(duplicator->progress().last_decree, confirmed_decree);
+ ASSERT_EQ(dupid, duplicator->id());
+ ASSERT_EQ(remote, duplicator->remote_cluster_name());
+ ASSERT_EQ(remote_app_name, duplicator->remote_app_name());
+ ASSERT_EQ(status, duplicator->_status);
+ ASSERT_EQ(confirmed_decree, duplicator->progress().confirmed_decree);
+ ASSERT_EQ(confirmed_decree, duplicator->progress().last_decree);
auto &expected_env = *duplicator;
ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
@@ -138,7 +142,15 @@ public:
INSTANTIATE_TEST_SUITE_P(, replica_duplicator_test, ::testing::Values(false,
true));
-TEST_P(replica_duplicator_test, new_duplicator) { test_new_duplicator(); }
+TEST_P(replica_duplicator_test, new_duplicator_without_remote_app_name)
+{
+ test_new_duplicator("temp", false);
+}
+
+TEST_P(replica_duplicator_test, new_duplicator_with_remote_app_name)
+{
+ test_new_duplicator("another_test_app", true);
+}
TEST_P(replica_duplicator_test, pause_start_duplication) {
test_pause_start_duplication(); }
diff --git a/src/replica/duplication/test/replica_follower_test.cpp
b/src/replica/duplication/test/replica_follower_test.cpp
index 4ebc8f026..5affa27d9 100644
--- a/src/replica/duplication/test/replica_follower_test.cpp
+++ b/src/replica/duplication/test/replica_follower_test.cpp
@@ -46,10 +46,13 @@ namespace replication {
class replica_follower_test : public duplication_test_base
{
public:
+ static const std::string kTestMasterClusterName;
+ static const std::string kTestMasterAppName;
+
replica_follower_test()
{
_app_info.app_id = 2;
- _app_info.app_name = "follower";
+ _app_info.app_name = kTestMasterAppName;
_app_info.app_type = replication_options::kReplicaAppType;
_app_info.is_stateful = true;
_app_info.max_replica_count = 3;
@@ -110,40 +113,58 @@ public:
stub->_nfs->start();
}
+ void test_init_master_info(const std::string &expected_master_app_name)
+ {
+
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
+ kTestMasterClusterName);
+
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
+
"127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803");
+ update_mock_replica(_app_info);
+
+ auto follower = _mock_replica->get_replica_follower();
+ ASSERT_EQ(expected_master_app_name, follower->get_master_app_name());
+ ASSERT_EQ(follower->get_master_cluster_name(), kTestMasterClusterName);
+ ASSERT_TRUE(follower->is_need_duplicate());
+ ASSERT_TRUE(_mock_replica->is_duplication_follower());
+ std::vector<std::string> test_ip{"127.0.0.1:34801", "127.0.0.1:34802",
"127.0.0.1:34803"};
+ for (int i = 0; i < follower->get_master_meta_list().size(); i++) {
+ ASSERT_EQ(test_ip[i],
std::string(follower->get_master_meta_list()[i].to_string()));
+ }
+
+ _app_info.envs.clear();
+ update_mock_replica(_app_info);
+ follower = _mock_replica->get_replica_follower();
+ ASSERT_FALSE(follower->is_need_duplicate());
+ ASSERT_FALSE(_mock_replica->is_duplication_follower());
+ }
+
public:
dsn::app_info _app_info;
mock_replica_ptr _mock_replica;
};
+const std::string replica_follower_test::kTestMasterClusterName = "master";
+const std::string replica_follower_test::kTestMasterAppName = "follower";
+
INSTANTIATE_TEST_SUITE_P(, replica_follower_test, ::testing::Values(false,
true));
-TEST_P(replica_follower_test, test_init_master_info)
+TEST_P(replica_follower_test, test_init_master_info_without_master_app_env)
{
-
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
"master");
-
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
- "127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803");
- update_mock_replica(_app_info);
-
- auto follower = _mock_replica->get_replica_follower();
- ASSERT_EQ(follower->get_master_app_name(), "follower");
- ASSERT_EQ(follower->get_master_cluster_name(), "master");
- ASSERT_TRUE(follower->is_need_duplicate());
- ASSERT_TRUE(_mock_replica->is_duplication_follower());
- std::vector<std::string> test_ip{"127.0.0.1:34801", "127.0.0.1:34802",
"127.0.0.1:34803"};
- for (int i = 0; i < follower->get_master_meta_list().size(); i++) {
-
ASSERT_EQ(std::string(follower->get_master_meta_list()[i].to_string()),
test_ip[i]);
- }
+ test_init_master_info(kTestMasterAppName);
+}
- _app_info.envs.clear();
- update_mock_replica(_app_info);
- follower = _mock_replica->get_replica_follower();
- ASSERT_FALSE(follower->is_need_duplicate());
- ASSERT_FALSE(_mock_replica->is_duplication_follower());
+TEST_P(replica_follower_test, test_init_master_info_with_master_app_env)
+{
+ static const std::string kTestAnotherMasterAppName("another_follower");
+
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterAppNameKey,
+ kTestAnotherMasterAppName);
+ test_init_master_info(kTestAnotherMasterAppName);
}
TEST_P(replica_follower_test, test_duplicate_checkpoint)
{
-
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
"master");
+
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
+ kTestMasterClusterName);
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
"127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803");
update_mock_replica(_app_info);
@@ -163,7 +184,8 @@ TEST_P(replica_follower_test, test_duplicate_checkpoint)
TEST_P(replica_follower_test,
test_async_duplicate_checkpoint_from_master_replica)
{
-
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
"master");
+
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
+ kTestMasterClusterName);
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
"127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803");
update_mock_replica(_app_info);
@@ -185,7 +207,8 @@ TEST_P(replica_follower_test,
test_async_duplicate_checkpoint_from_master_replic
TEST_P(replica_follower_test, test_update_master_replica_config)
{
-
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
"master");
+
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
+ kTestMasterClusterName);
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
"127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803");
update_mock_replica(_app_info);
@@ -243,7 +266,8 @@ TEST_P(replica_follower_test,
test_update_master_replica_config)
TEST_P(replica_follower_test, test_nfs_copy_checkpoint)
{
-
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
"master");
+
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey,
+ kTestMasterClusterName);
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
"127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803");
update_mock_replica(_app_info);
diff --git a/src/replica/replica_http_service.cpp
b/src/replica/replica_http_service.cpp
index ceffc6683..429ab233c 100644
--- a/src/replica/replica_http_service.cpp
+++ b/src/replica/replica_http_service.cpp
@@ -76,6 +76,7 @@ void replica_http_service::query_duplication_handler(const
http_request &req, ht
{"not_confirmed_mutations_num", s.second.not_confirmed},
{"not_duplicated_mutations_num", s.second.not_duplicated},
{"fail_mode", duplication_fail_mode_to_string(s.second.fail_mode)},
+ {"remote_app_name", s.second.remote_app_name},
};
}
resp.status_code = http_status_code::kOk;
diff --git a/src/runtime/rpc/rpc_holder.h b/src/runtime/rpc/rpc_holder.h
index eefb4c564..2aba60891 100644
--- a/src/runtime/rpc/rpc_holder.h
+++ b/src/runtime/rpc/rpc_holder.h
@@ -97,7 +97,7 @@ public:
std::chrono::milliseconds timeout = 0_ms,
uint64_t partition_hash = 0,
int thread_hash = 0)
- : _i(new internal(req, code, timeout, partition_hash, thread_hash))
+ : _i(new internal(std::move(req), code, timeout, partition_hash,
thread_hash))
{
}
@@ -285,7 +285,7 @@ private:
unmarshall(req, *thrift_request);
}
- internal(std::unique_ptr<TRequest> &req,
+ internal(std::unique_ptr<TRequest> req,
task_code code,
std::chrono::milliseconds timeout,
uint64_t partition_hash,
diff --git a/src/shell/commands/duplication.cpp
b/src/shell/commands/duplication.cpp
index 97bff425b..f974ad078 100644
--- a/src/shell/commands/duplication.cpp
+++ b/src/shell/commands/duplication.cpp
@@ -20,6 +20,7 @@
#include <fmt/core.h>
#include <stdint.h>
#include <stdio.h>
+#include <initializer_list>
#include <iostream>
#include <memory>
#include <set>
@@ -47,10 +48,10 @@ using dsn::replication::duplication_status;
bool add_dup(command_executor *e, shell_context *sc, arguments args)
{
- // add_dup <app_name> <remote_cluster_name> [-f|--freeze]
+ // add_dup <app_name> <remote_cluster_name> [-s|--sst]
[-a|--remote_app_name str]
argh::parser cmd(args.argc, args.argv);
- if (cmd.pos_args().size() > 3) {
+ if (cmd.pos_args().size() > 4) {
fmt::print(stderr, "too many params\n");
return false;
}
@@ -80,35 +81,54 @@ bool add_dup(command_executor *e, shell_context *sc,
arguments args)
return true;
}
+ // Check if the boolean option is specified.
bool is_duplicating_checkpoint = cmd[{"-s", "--sst"}];
- auto err_resp =
- sc->ddl_client->add_dup(app_name, remote_cluster_name,
is_duplicating_checkpoint);
- dsn::error_s err = err_resp.get_error();
+
+ // Read the app name of the remote cluster, if any.
+ // Otherwise, use app_name as the remote_app_name.
+ std::string remote_app_name(cmd({"-a", "--remote_app_name"},
app_name).str());
+
+ auto err_resp = sc->ddl_client->add_dup(
+ app_name, remote_cluster_name, is_duplicating_checkpoint,
remote_app_name);
+ auto err = err_resp.get_error();
std::string hint;
- if (err.is_ok()) {
+ if (err) {
err = dsn::error_s::make(err_resp.get_value().err);
hint = err_resp.get_value().hint;
}
- if (!err.is_ok()) {
+
+ if (!err) {
fmt::print(stderr,
"adding duplication failed [app: {}, remote: {},
checkpoint: {}, error: {}]\n",
app_name,
remote_cluster_name,
is_duplicating_checkpoint,
- err.description());
+ err);
+
if (!hint.empty()) {
fmt::print(stderr, "detail:\n {}\n", hint);
}
+
+ return true;
+ }
+
+ const auto &resp = err_resp.get_value();
+ fmt::print("adding duplication succeed [app: {}, remote: {}, appid: {},
dupid: "
+ "{}], checkpoint: {}",
+ app_name,
+ remote_cluster_name,
+ resp.appid,
+ resp.dupid,
+ is_duplicating_checkpoint);
+
+ if (resp.__isset.remote_app_name) {
+ fmt::print(", remote_app_name: {}\n", remote_app_name);
} else {
- const auto &resp = err_resp.get_value();
- fmt::print("adding duplication succeed [app: {}, remote: {}, appid:
{}, dupid: "
- "{}], checkpoint: {}\n",
- app_name,
- remote_cluster_name,
- resp.appid,
- resp.dupid,
- is_duplicating_checkpoint);
+ fmt::print("\nWARNING: meta server does NOT support specifying
remote_app_name, "
+ "remote_app_name might has been specified with {}\n",
+ app_name);
}
+
return true;
}
@@ -145,44 +165,49 @@ bool query_dup(command_executor *e, shell_context *sc,
arguments args)
}
std::string app_name = cmd(1).str();
+ // Check if the boolean option is specified.
bool detail = cmd[{"-d", "--detail"}];
auto err_resp = sc->ddl_client->query_dup(app_name);
dsn::error_s err = err_resp.get_error();
- if (err.is_ok()) {
+ if (err) {
err = dsn::error_s::make(err_resp.get_value().err);
}
- if (!err.is_ok()) {
- fmt::print(stderr,
- "querying duplications of app [{}] failed, error={}\n",
- app_name,
- err.description());
- } else if (detail) {
+ if (!err) {
+ fmt::print(stderr, "querying duplications of app [{}] failed,
error={}\n", app_name, err);
+
+ return true;
+ }
+
+ if (detail) {
fmt::print("duplications of app [{}] in detail:\n", app_name);
fmt::print("{}\n\n",
duplication_query_response_to_string(err_resp.get_value()));
- } else {
- const auto &resp = err_resp.get_value();
- fmt::print("duplications of app [{}] are listed as below:\n",
app_name);
-
- dsn::utils::table_printer printer;
- printer.add_title("dup_id");
- printer.add_column("status");
- printer.add_column("remote cluster");
- printer.add_column("create time");
-
- char create_time[25];
- for (auto info : resp.entry_list) {
- dsn::utils::time_ms_to_date_time(info.create_ts, create_time,
sizeof(create_time));
-
- printer.add_row(info.dupid);
- printer.append_data(duplication_status_to_string(info.status));
- printer.append_data(info.remote);
- printer.append_data(create_time);
-
- printer.output(std::cout);
- std::cout << std::endl;
- }
+
+ return true;
+ }
+
+ const auto &resp = err_resp.get_value();
+ fmt::print("duplications of app [{}] are listed as below:\n", app_name);
+
+ dsn::utils::table_printer printer;
+ printer.add_title("dup_id");
+ printer.add_column("status");
+ printer.add_column("remote cluster");
+ printer.add_column("create time");
+
+ for (auto info : resp.entry_list) {
+ std::string create_time;
+ dsn::utils::time_ms_to_string(info.create_ts, create_time);
+
+ printer.add_row(info.dupid);
+ printer.append_data(duplication_status_to_string(info.status));
+ printer.append_data(info.remote);
+ printer.append_data(create_time);
+
+ printer.output(std::cout);
+ std::cout << std::endl;
}
+
return true;
}
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 72084da96..6e2909c3a 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -473,7 +473,10 @@ static command_executor commands[] = {
"[-s|--skip_prompt] [-o|--output file_name]",
ddd_diagnose,
},
- {"add_dup", "add duplication", "<app_name> <remote_cluster_name>
[-s|--sst]", add_dup},
+ {"add_dup",
+ "add duplication",
+ "<app_name> <remote_cluster_name> [-s|--sst] [-a|--remote_app_name str]",
+ add_dup},
{"query_dup", "query duplication info", "<app_name> [-d|--detail]",
query_dup},
{"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup},
{"start_dup", "start duplication", "<app_name> <dup_id>", start_dup},
diff --git a/src/utils/errors.h b/src/utils/errors.h
index 17ee51092..c611e1bef 100644
--- a/src/utils/errors.h
+++ b/src/utils/errors.h
@@ -251,6 +251,15 @@ USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_s);
}
\
} while (false)
+#define RETURN_EW_NOT_OK_MSG(s, T, ...)
\
+ do {
\
+ ::dsn::error_s _s = (s);
\
+ if (dsn_unlikely(!_s)) {
\
+ fmt::print(stderr, "{}: {}\n", _s.description(),
fmt::format(__VA_ARGS__)); \
+ return dsn::error_with<T>(std::move(_s));
\
+ }
\
+ } while (false)
+
#define CHECK_OK(s, ...)
\
do {
\
const ::dsn::error_s &_s = (s);
\
diff --git a/src/utils/test/time_utils_test.cpp
b/src/utils/test/time_utils_test.cpp
index 74c0c8db4..35838aaf7 100644
--- a/src/utils/test/time_utils_test.cpp
+++ b/src/utils/test/time_utils_test.cpp
@@ -87,15 +87,31 @@ TEST(time_utils, get_current_physical_time_ns)
ASSERT_LT(get_current_physical_time_ns() - ts_ns, 1e7); // < 10 ms
}
-TEST(time_utils, time_ms_to_string)
+template <typename T>
+void test_time_ms_to_string(T &str)
+{
+ time_ms_to_string(1605091506136, str);
+
+ std::string actual_str(str);
+
+ // Time differ between time zones.
+ //
+ // The real time 2020-11-11 18:45:06.136 (UTC+8)
+ // so it must be 2020-11-1x xx:45:06.136.
+ ASSERT_EQ(std::string("2020-11-1"), actual_str.substr(0, 9));
+ ASSERT_EQ(std::string(":45:06.136"), actual_str.substr(13, 10));
+}
+
+TEST(time_utils, time_ms_to_buf)
{
char buf[64] = {0};
- time_ms_to_string(1605091506136, buf);
- // time differ between time zones,
- // the real time 2020-11-11 18:45:06.136 (UTC+8)
- // so it must be 2020-11-1x xx:45:06.136
- ASSERT_EQ(std::string(buf).substr(0, 9), "2020-11-1");
- ASSERT_EQ(std::string(buf).substr(13, 10), ":45:06.136");
+ test_time_ms_to_string(buf);
+}
+
+TEST(time_utils, time_ms_to_str)
+{
+ std::string str;
+ test_time_ms_to_string(str);
}
} // namespace utils
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]