acelyc111 commented on code in PR #1960:
URL:
https://github.com/apache/incubator-pegasus/pull/1960#discussion_r1539033696
##########
idl/duplication.thrift:
##########
@@ -66,6 +66,9 @@ struct duplication_add_request
// - if false, duplication start state=DS_LOG,
// server will replay and send plog mutation to follower cluster derectly
3:optional bool is_duplicating_checkpoint = true;
+
+ // Specify the app name of remote cluster.
+ 4:optional string remote_app_name;
Review Comment:
For the newly added fields, describe since which version (i.e., 2.6) they
are added, as how this do in `struct app_info`.
##########
src/shell/commands/duplication.cpp:
##########
@@ -80,35 +81,55 @@ bool add_dup(command_executor *e, shell_context *sc,
arguments args)
return true;
}
+ // Check if the boolean option is specified.
bool is_duplicating_checkpoint = cmd[{"-s", "--sst"}];
- auto err_resp =
- sc->ddl_client->add_dup(app_name, remote_cluster_name,
is_duplicating_checkpoint);
- dsn::error_s err = err_resp.get_error();
+
+ // Read the app name of the remote cluster, if any.
+ // Otherwise, use app_name as the remote_app_name.
+ std::string remote_app_name(cmd({"-a", "--remote_app_name"},
app_name).str());
+
+ auto err_resp = sc->ddl_client->add_dup(
+ app_name, remote_cluster_name, is_duplicating_checkpoint,
remote_app_name);
+ auto err = err_resp.get_error();
std::string hint;
- if (err.is_ok()) {
+ if (err) {
err = dsn::error_s::make(err_resp.get_value().err);
hint = err_resp.get_value().hint;
}
- if (!err.is_ok()) {
+
+ if (!err) {
fmt::print(stderr,
"adding duplication failed [app: {}, remote: {},
checkpoint: {}, error: {}]\n",
app_name,
remote_cluster_name,
is_duplicating_checkpoint,
- err.description());
+ err);
+
if (!hint.empty()) {
fmt::print(stderr, "detail:\n {}\n", hint);
}
+
+ return true;
+ }
+
+ const auto &resp = err_resp.get_value();
+ fmt::print("adding duplication succeed [app: {}, remote: {}, appid: {},
dupid: "
+ "{}], checkpoint: {}",
+ app_name,
+ remote_cluster_name,
+ resp.appid,
+ resp.dupid,
+ is_duplicating_checkpoint);
+
+ if (resp.__isset.remote_app_name) {
+ fmt::print(", remote_app_name: {}\n", remote_app_name);
} else {
- const auto &resp = err_resp.get_value();
- fmt::print("adding duplication succeed [app: {}, remote: {}, appid:
{}, dupid: "
- "{}], checkpoint: {}\n",
- app_name,
- remote_cluster_name,
- resp.appid,
- resp.dupid,
- is_duplicating_checkpoint);
+ fmt::print("\nWARNING: meta server does NOT support specifying
remote_app_name, "
+ "remote_app_name might has been specified with {}\n",
+ remote_app_name,
+ app_name);
Review Comment:
Why specify 2 arguments?
##########
idl/duplication.thrift:
##########
@@ -66,6 +66,9 @@ struct duplication_add_request
// - if false, duplication start state=DS_LOG,
// server will replay and send plog mutation to follower cluster derectly
3:optional bool is_duplicating_checkpoint = true;
+
+ // Specify the app name of remote cluster.
+ 4:optional string remote_app_name;
Review Comment:
Is it safe to use app **name**, what happen if the app renaming in remote
cluster? Or even swap app names in remote cluster?
##########
src/meta/duplication/duplication_info.h:
##########
@@ -53,19 +53,21 @@ class duplication_info
/// \see meta_duplication_service::new_dup_from_init
/// \see duplication_info::decode_from_blob
duplication_info(dupid_t dupid,
Review Comment:
Unify to rename it to `dup_id`?
##########
src/meta/duplication/meta_duplication_service.cpp:
##########
@@ -158,24 +173,34 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
const auto &request = rpc.request();
auto &response = rpc.response();
- LOG_INFO("add duplication for app({}), remote cluster name is {}",
- request.app_name,
- request.remote_cluster_name);
+ std::string remote_app_name;
+ if (request.__isset.remote_app_name) {
+ remote_app_name = request.remote_app_name;
+ } else {
+ // Once remote_app_name is not specified by client, use source app_name
+ // as remote_app_name to be compatible with old versions of client.
+ remote_app_name = request.app_name;
+ }
- response.err = ERR_OK;
+ LOG_INFO("add duplication for app({}), remote cluster name is {}"
+ "remote app name is {}",
+ request.app_name,
+ request.remote_cluster_name,
+ remote_app_name);
if (request.remote_cluster_name == get_current_cluster_name()) {
response.err = ERR_INVALID_PARAMETERS;
- response.__set_hint("illegal operation: adding duplication to itself");
- return;
+ LOG_WARNING_DUP_HINT_AND_RETURN(response,
+ "illegal operation: adding duplication
to itself");
}
Review Comment:
How about define the macro as:
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(exp, resp, ...)
The error code is always `ERR_INVALID_PARAMETERS` so I think it can be
ommited.
##########
src/common/duplication_common.cpp:
##########
@@ -143,13 +145,20 @@ static nlohmann::json duplication_entry_to_json(const
duplication_entry &ent)
{"status", duplication_status_to_string(ent.status)},
{"fail_mode", duplication_fail_mode_to_string(ent.fail_mode)},
};
+
if (ent.__isset.progress) {
nlohmann::json sub_json;
for (const auto &p : ent.progress) {
sub_json[std::to_string(p.first)] = p.second;
}
json["progress"] = sub_json;
}
+
+ if (ent.__isset.remote_app_name) {
+ // remote_app_name is shown only if it was specified explicitly for
new versions.
Review Comment:
Better to mention since which version (2.6).
##########
src/meta/duplication/meta_duplication_service.cpp:
##########
@@ -69,12 +70,13 @@ void meta_duplication_service::query_duplication_info(const
duplication_query_re
std::shared_ptr<app_state> app = _state->get_app(request.app_name);
if (!app || app->status != app_status::AS_AVAILABLE) {
response.err = ERR_APP_NOT_EXIST;
- } else {
- response.appid = app->app_id;
- for (auto &dup_id_to_info : app->duplications) {
- const duplication_info_s_ptr &dup = dup_id_to_info.second;
- dup->append_if_valid_for_query(*app, response.entry_list);
- }
+ return;
+ }
+
+ response.appid = app->app_id;
+ for (auto &dup_id_to_info : app->duplications) {
+ const duplication_info_s_ptr &dup = dup_id_to_info.second;
Review Comment:
```suggestion
for (const auto &[_, dup] : app->duplications) {
```
##########
src/shell/commands/duplication.cpp:
##########
@@ -145,44 +166,49 @@ bool query_dup(command_executor *e, shell_context *sc,
arguments args)
}
std::string app_name = cmd(1).str();
+ // Check if the boolean option is specified.
bool detail = cmd[{"-d", "--detail"}];
auto err_resp = sc->ddl_client->query_dup(app_name);
dsn::error_s err = err_resp.get_error();
- if (err.is_ok()) {
+ if (err) {
err = dsn::error_s::make(err_resp.get_value().err);
}
- if (!err.is_ok()) {
- fmt::print(stderr,
- "querying duplications of app [{}] failed, error={}\n",
- app_name,
- err.description());
- } else if (detail) {
+ if (!err) {
+ fmt::print(stderr, "querying duplications of app [{}] failed,
error={}\n", app_name, err);
+
+ return true;
+ }
+
+ if (detail) {
fmt::print("duplications of app [{}] in detail:\n", app_name);
fmt::print("{}\n\n",
duplication_query_response_to_string(err_resp.get_value()));
- } else {
- const auto &resp = err_resp.get_value();
- fmt::print("duplications of app [{}] are listed as below:\n",
app_name);
-
- dsn::utils::table_printer printer;
- printer.add_title("dup_id");
- printer.add_column("status");
- printer.add_column("remote cluster");
- printer.add_column("create time");
-
- char create_time[25];
- for (auto info : resp.entry_list) {
- dsn::utils::time_ms_to_date_time(info.create_ts, create_time,
sizeof(create_time));
-
- printer.add_row(info.dupid);
- printer.append_data(duplication_status_to_string(info.status));
- printer.append_data(info.remote);
- printer.append_data(create_time);
-
- printer.output(std::cout);
- std::cout << std::endl;
- }
+
+ return true;
+ }
+
+ const auto &resp = err_resp.get_value();
+ fmt::print("duplications of app [{}] are listed as below:\n", app_name);
+
+ dsn::utils::table_printer printer;
+ printer.add_title("dup_id");
+ printer.add_column("status");
+ printer.add_column("remote cluster");
+ printer.add_column("create time");
+
+ char create_time[25];
+ for (auto info : resp.entry_list) {
+ dsn::utils::time_ms_to_date_time(info.create_ts, create_time,
sizeof(create_time));
Review Comment:
How about using `void time_ms_to_string(uint64_t ts_ms, std::string &str)`
version to avoid array overflow unexpectly?
##########
src/meta/duplication/meta_duplication_service.cpp:
##########
@@ -150,6 +152,19 @@ void
meta_duplication_service::do_modify_duplication(std::shared_ptr<app_state>
});
}
+#define LOG_DUP_HINT_AND_RETURN(resp, level, ...)
\
+ do {
\
+ std::string _msg(fmt::format(__VA_ARGS__));
\
Review Comment:
```suggestion
const std::string _msg(fmt::format(__VA_ARGS__));
\
```
##########
src/meta/duplication/meta_duplication_service.cpp:
##########
@@ -184,50 +209,97 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
duplication_constants::kClustersSectionName.c_str(),
request.remote_cluster_name.c_str())) {
response.err = ERR_INVALID_PARAMETERS;
- response.__set_hint(fmt::format("failed to find cluster[{}] address in
config [{}]",
+ LOG_WARNING_DUP_HINT_AND_RETURN(response,
+ "failed to find cluster[{}] address in
config [{}]",
request.remote_cluster_name,
-
duplication_constants::kClustersSectionName));
- return;
+
duplication_constants::kClustersSectionName);
}
- auto app = _state->get_app(request.app_name);
- if (!app || app->status != app_status::AS_AVAILABLE) {
- response.err = ERR_APP_NOT_EXIST;
- return;
- }
+ std::shared_ptr<app_state> app;
duplication_info_s_ptr dup;
- for (const auto &ent : app->duplications) {
- auto it = ent.second;
- if (it->follower_cluster_name == request.remote_cluster_name) {
- dup = ent.second;
- break;
+ {
+ zauto_read_lock l(app_lock());
+
+ app = _state->get_app(request.app_name);
+ if (!app || app->status != app_status::AS_AVAILABLE) {
+ response.err = ERR_APP_NOT_EXIST;
+ LOG_WARNING("app {} was not found", request.app_name);
+ return;
+ }
+
+ for (const auto &ent : app->duplications) {
+ if (ent.second->remote_cluster_name ==
request.remote_cluster_name) {
+ dup = ent.second;
+ break;
+ }
+ }
+
+ if (dup) {
+ // The duplication for the same app to the same remote cluster has
existed.
+ remote_app_name = dup->remote_app_name;
+ LOG_INFO("no need to add duplication, since it has existed:
app_name={}, "
+ "remote_cluster_name={}, remote_app_name={}",
+ request.app_name,
+ request.remote_cluster_name,
+ remote_app_name);
+ } else {
+ // Check if other apps of this cluster are duplicated to the same
remote app.
+ for (const auto &kv : _state->_exist_apps) {
Review Comment:
```suggestion
for (const auto &[app_name, cur_app_state] :
_state->_exist_apps) {
```
##########
src/meta/duplication/meta_duplication_service.cpp:
##########
@@ -184,50 +209,97 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
duplication_constants::kClustersSectionName.c_str(),
request.remote_cluster_name.c_str())) {
response.err = ERR_INVALID_PARAMETERS;
- response.__set_hint(fmt::format("failed to find cluster[{}] address in
config [{}]",
+ LOG_WARNING_DUP_HINT_AND_RETURN(response,
+ "failed to find cluster[{}] address in
config [{}]",
request.remote_cluster_name,
-
duplication_constants::kClustersSectionName));
- return;
+
duplication_constants::kClustersSectionName);
}
- auto app = _state->get_app(request.app_name);
- if (!app || app->status != app_status::AS_AVAILABLE) {
- response.err = ERR_APP_NOT_EXIST;
- return;
- }
+ std::shared_ptr<app_state> app;
duplication_info_s_ptr dup;
- for (const auto &ent : app->duplications) {
- auto it = ent.second;
- if (it->follower_cluster_name == request.remote_cluster_name) {
- dup = ent.second;
- break;
+ {
+ zauto_read_lock l(app_lock());
+
+ app = _state->get_app(request.app_name);
+ if (!app || app->status != app_status::AS_AVAILABLE) {
+ response.err = ERR_APP_NOT_EXIST;
+ LOG_WARNING("app {} was not found", request.app_name);
+ return;
+ }
+
+ for (const auto &ent : app->duplications) {
+ if (ent.second->remote_cluster_name ==
request.remote_cluster_name) {
+ dup = ent.second;
+ break;
+ }
+ }
+
+ if (dup) {
+ // The duplication for the same app to the same remote cluster has
existed.
+ remote_app_name = dup->remote_app_name;
+ LOG_INFO("no need to add duplication, since it has existed:
app_name={}, "
+ "remote_cluster_name={}, remote_app_name={}",
+ request.app_name,
+ request.remote_cluster_name,
+ remote_app_name);
+ } else {
+ // Check if other apps of this cluster are duplicated to the same
remote app.
+ for (const auto &kv : _state->_exist_apps) {
+ if (kv.first == request.app_name) {
+ // Skip this app since we want ot check other apps.
+ continue;
+ }
+
+ for (const auto &ent : kv.second->duplications) {
+ if (ent.second->remote_cluster_name !=
request.remote_cluster_name) {
+ continue;
+ }
+
+ if (ent.second->remote_app_name != remote_app_name) {
+ continue;
+ }
+
+ response.err = ERR_INVALID_PARAMETERS;
+ LOG_WARNING_DUP_HINT_AND_RETURN(response,
+ "illegal operation:
another app({}) is also "
+ "duplicated to the same
remote app("
+ "cluster={}, app={})",
+ kv.first,
+
request.remote_cluster_name,
+ remote_app_name);
+ }
+ }
}
}
+
if (!dup) {
- dup = new_dup_from_init(request.remote_cluster_name,
std::move(meta_list), app);
+ dup = new_dup_from_init(
+ request.remote_cluster_name, remote_app_name,
std::move(meta_list), app);
}
- do_add_duplication(app, dup, rpc);
+
+ do_add_duplication(app, dup, rpc, remote_app_name);
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state>
&app,
duplication_info_s_ptr &dup,
- duplication_add_rpc &rpc)
+ duplication_add_rpc &rpc,
+ const std::string
&remote_app_name)
{
- const auto err = dup->start(rpc.request().is_duplicating_checkpoint);
+ const auto &err = dup->start(rpc.request().is_duplicating_checkpoint);
if (dsn_unlikely(err != ERR_OK)) {
- LOG_ERROR("start dup[{}({})] failed: err = {}", app->app_name,
dup->id, err);
- return;
+ LOG_ERROR_DUP_HINT_AND_RETURN(
Review Comment:
Ditto.
The `return` word is missing seems a little confused although it's in the
macro name.
##########
src/meta/duplication/meta_duplication_service.cpp:
##########
@@ -184,50 +209,97 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
duplication_constants::kClustersSectionName.c_str(),
request.remote_cluster_name.c_str())) {
response.err = ERR_INVALID_PARAMETERS;
- response.__set_hint(fmt::format("failed to find cluster[{}] address in
config [{}]",
+ LOG_WARNING_DUP_HINT_AND_RETURN(response,
+ "failed to find cluster[{}] address in
config [{}]",
request.remote_cluster_name,
-
duplication_constants::kClustersSectionName));
- return;
+
duplication_constants::kClustersSectionName);
}
- auto app = _state->get_app(request.app_name);
- if (!app || app->status != app_status::AS_AVAILABLE) {
- response.err = ERR_APP_NOT_EXIST;
- return;
- }
+ std::shared_ptr<app_state> app;
duplication_info_s_ptr dup;
- for (const auto &ent : app->duplications) {
- auto it = ent.second;
- if (it->follower_cluster_name == request.remote_cluster_name) {
- dup = ent.second;
- break;
+ {
+ zauto_read_lock l(app_lock());
+
+ app = _state->get_app(request.app_name);
+ if (!app || app->status != app_status::AS_AVAILABLE) {
+ response.err = ERR_APP_NOT_EXIST;
+ LOG_WARNING("app {} was not found", request.app_name);
+ return;
+ }
+
+ for (const auto &ent : app->duplications) {
+ if (ent.second->remote_cluster_name ==
request.remote_cluster_name) {
+ dup = ent.second;
+ break;
+ }
+ }
+
+ if (dup) {
+ // The duplication for the same app to the same remote cluster has
existed.
+ remote_app_name = dup->remote_app_name;
+ LOG_INFO("no need to add duplication, since it has existed:
app_name={}, "
+ "remote_cluster_name={}, remote_app_name={}",
+ request.app_name,
+ request.remote_cluster_name,
+ remote_app_name);
+ } else {
+ // Check if other apps of this cluster are duplicated to the same
remote app.
+ for (const auto &kv : _state->_exist_apps) {
+ if (kv.first == request.app_name) {
+ // Skip this app since we want ot check other apps.
Review Comment:
```suggestion
// Skip this app since we want to check other apps.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]