This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 9ad827786 refactor(FQDN): Update src/common/consensus.thrift related
code (#1989)
9ad827786 is described below
commit 9ad82778624cdc6a46ee5488df00eaa361bdada8
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Apr 29 10:51:19 2024 +0800
refactor(FQDN): Update src/common/consensus.thrift related code (#1989)
---
.../duplication/test/replica_follower_test.cpp | 4 +-
src/replica/replica_check.cpp | 13 +-
src/replica/replica_chkpt.cpp | 5 +-
src/replica/replica_learn.cpp | 141 +++++++++------------
src/replica/split/replica_split_manager.cpp | 5 +-
src/replica/split/test/replica_split_test.cpp | 9 +-
6 files changed, 73 insertions(+), 104 deletions(-)
diff --git a/src/replica/duplication/test/replica_follower_test.cpp
b/src/replica/duplication/test/replica_follower_test.cpp
index 34585a982..15728386e 100644
--- a/src/replica/duplication/test/replica_follower_test.cpp
+++ b/src/replica/duplication/test/replica_follower_test.cpp
@@ -32,7 +32,6 @@
#include "nfs/nfs_node.h"
#include "replica/duplication/replica_follower.h"
#include "replica/test/mock_utils.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task_tracker.h"
@@ -278,8 +277,7 @@ TEST_P(replica_follower_test, test_nfs_copy_checkpoint)
auto resp = learn_response();
const host_port learnee("localhost", 34801);
- resp.learnee = dsn::dns_resolver::instance().resolve_address(learnee);
- resp.__set_hp_learnee(learnee);
+ SET_IP_AND_HOST_PORT_BY_DNS(resp, learnee, learnee);
std::string dest = utils::filesystem::path_combine(
_mock_replica->dir(),
duplication_constants::kDuplicationCheckpointRootDir);
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index 22148c2bd..c4a86dec5 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -122,16 +122,16 @@ void replica::broadcast_group_check()
}
for (auto it = _primary_states.statuses.begin(); it !=
_primary_states.statuses.end(); ++it) {
- if (it->first == _stub->primary_host_port())
+ if (it->first == _stub->primary_host_port()) {
continue;
+ }
auto hp = it->first;
- auto addr = dsn::dns_resolver::instance().resolve_address(hp);
std::shared_ptr<group_check_request> request(new group_check_request);
request->app = _app_info;
- request->node = addr;
- request->__set_hp_node(hp);
+ const auto addr = dsn::dns_resolver::instance().resolve_address(hp);
+ SET_IP_AND_HOST_PORT(*request, node, addr, hp);
_primary_states.get_replica_config(it->second, request->config);
request->last_committed_decree = last_committed_decree();
request->__set_confirmed_decree(_duplication_mgr->min_confirmed_decree());
@@ -226,8 +226,7 @@ void replica::on_group_check(const group_check_request
&request,
}
response.pid = get_gpid();
- response.node = _stub->primary_address();
- response.__set_hp_node(_stub->primary_host_port());
+ SET_IP_AND_HOST_PORT(response, node, _stub->primary_address(),
_stub->primary_host_port());
response.err = ERR_OK;
if (status() == partition_status::PS_ERROR) {
response.err = ERR_INVALID_STATE;
@@ -253,7 +252,7 @@ void replica::on_group_check_reply(error_code err,
}
auto r = _primary_states.group_check_pending_replies.erase(hp_node);
- CHECK_EQ_MSG(r, 1, "invalid node address, address = {}({})", hp_node,
req->node);
+ CHECK_EQ_MSG(r, 1, "invalid node: {}", FMT_HOST_PORT_AND_IP(*req, node));
if (err != ERR_OK || resp->err != ERR_OK) {
if (ERR_OK == err) {
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index fc2b3a4dc..27346aa17 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -49,6 +49,7 @@
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_holder.h"
+#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
#include "split/replica_split_manager.h"
@@ -261,8 +262,8 @@ void replica::on_query_last_checkpoint(/*out*/
learn_response &response)
// for example: base_local_dir = "./data" + "checkpoint.1024" =
"./data/checkpoint.1024"
response.base_local_dir = utils::filesystem::path_combine(
_app->data_dir(),
checkpoint_folder(response.state.to_decree_included));
- response.learnee = _stub->primary_address();
- response.__set_hp_learnee(_stub->primary_host_port());
+ SET_IP_AND_HOST_PORT(
+ response, learnee, _stub->primary_address(),
_stub->primary_host_port());
for (auto &file : response.state.files) {
// response.state.files contain file absolute path, for example:
// "./data/checkpoint.1024/1.sst" use `substr` to get the file
name: 1.sst
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 9594824cf..bb8414ce4 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -233,8 +233,7 @@ void replica::init_learn(uint64_t signature)
request.__set_max_gced_decree(get_max_gced_decree_for_learn());
request.last_committed_decree_in_app = _app->last_committed_decree();
request.last_committed_decree_in_prepare_list =
_prepare_list->last_committed_decree();
- request.learner = _stub->primary_address();
- request.__set_hp_learner(_stub->primary_host_port());
+ SET_IP_AND_HOST_PORT(request, learner, _stub->primary_address(),
_stub->primary_host_port());
request.signature = _potential_secondary_states.learning_version;
_app->prepare_get_checkpoint(request.app_specific_learn_request);
@@ -399,15 +398,13 @@ void replica::on_learn(dsn::message_ex *msg, const
learn_request &request)
// TODO: learner machine has been down for a long time, and DDD MUST
happened before
// which leads to state lost. Now the lost state is back, what shall we do?
if (request.last_committed_decree_in_app > last_prepared_decree()) {
- LOG_ERROR_PREFIX(
- "on_learn[{:#018x}]: learner = {}({}), learner state is newer than
learnee, "
- "learner_app_committed_decree = {}, local_committed_decree = {},
learn "
- "from scratch",
- request.signature,
- hp_learner,
- request.learner,
- request.last_committed_decree_in_app,
- local_committed_decree);
+ LOG_ERROR_PREFIX("on_learn[{:#018x}]: learner = {}, learner state is
newer than learnee, "
+ "learner_app_committed_decree = {},
local_committed_decree = {}, learn "
+ "from scratch",
+ request.signature,
+ FMT_HOST_PORT_AND_IP(request, learner),
+ request.last_committed_decree_in_app,
+ local_committed_decree);
*(decree *)&request.last_committed_decree_in_app = 0;
}
@@ -416,29 +413,25 @@ void replica::on_learn(dsn::message_ex *msg, const
learn_request &request)
// this happens when the new primary does not commit the previously
prepared mutations
// yet, which it should do, so let's help it now.
else if (request.last_committed_decree_in_app > local_committed_decree) {
- LOG_ERROR_PREFIX(
- "on_learn[{:#018x}]: learner = {}({}), learner's
last_committed_decree_in_app "
- "is newer than learnee, learner_app_committed_decree = {}, "
- "local_committed_decree = {}, commit local soft",
- request.signature,
- hp_learner,
- request.learner,
- request.last_committed_decree_in_app,
- local_committed_decree);
+ LOG_ERROR_PREFIX("on_learn[{:#018x}]: learner = {}, learner's
last_committed_decree_in_app "
+ "is newer than learnee, learner_app_committed_decree
= {}, "
+ "local_committed_decree = {}, commit local soft",
+ request.signature,
+ FMT_HOST_PORT_AND_IP(request, learner),
+ request.last_committed_decree_in_app,
+ local_committed_decree);
// we shouldn't commit mutations hard coz these mutations may
preparing on another learner
_prepare_list->commit(request.last_committed_decree_in_app,
COMMIT_TO_DECREE_SOFT);
local_committed_decree = last_committed_decree();
if (request.last_committed_decree_in_app > local_committed_decree) {
- LOG_ERROR_PREFIX(
- "on_learn[{:#018x}]: try to commit primary to {}, still less
than "
- "learner({}({}))'s committed decree({}), wait mutations to be
commitable",
- request.signature,
- local_committed_decree,
- hp_learner,
- request.learner,
- request.last_committed_decree_in_app);
+ LOG_ERROR_PREFIX("on_learn[{:#018x}]: try to commit primary to {},
still less than "
+ "learner({})'s committed decree({}), wait
mutations to be commitable",
+ request.signature,
+ local_committed_decree,
+ FMT_HOST_PORT_AND_IP(request, learner),
+ request.last_committed_decree_in_app);
response.err = ERR_INCONSISTENT_STATE;
reply(msg, response);
return;
@@ -451,13 +444,12 @@ void replica::on_learn(dsn::message_ex *msg, const
learn_request &request)
response.state.__set_learn_start_decree(learn_start_decree);
bool delayed_replay_prepare_list = false;
- LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}),
remote_committed_decree = {}, "
+ LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, remote_committed_decree
= {}, "
"remote_app_committed_decree = {}, local_committed_decree
= {}, "
"app_committed_decree = {}, app_durable_decree = {}, "
"prepare_min_decree = {}, prepare_list_count = {},
learn_start_decree = {}",
request.signature,
- hp_learner,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
request.last_committed_decree_in_prepare_list,
request.last_committed_decree_in_app,
local_committed_decree,
@@ -466,9 +458,7 @@ void replica::on_learn(dsn::message_ex *msg, const
learn_request &request)
_prepare_list->min_decree(),
_prepare_list->count(),
learn_start_decree);
-
- response.learnee = _stub->primary_address();
- response.__set_hp_learnee(_stub->primary_host_port());
+ SET_IP_AND_HOST_PORT(response, learnee, _stub->primary_address(),
_stub->primary_host_port());
response.prepare_start_decree = invalid_decree;
response.last_committed_decree = local_committed_decree;
response.err = ERR_OK;
@@ -482,36 +472,32 @@ void replica::on_learn(dsn::message_ex *msg, const
learn_request &request)
delayed_replay_prepare_list);
if (!should_learn_cache) {
if (learn_start_decree > _app->last_durable_decree()) {
- LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), choose to
learn private logs, "
+ LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn
private logs, "
"because learn_start_decree({}) >
_app->last_durable_decree({})",
request.signature,
- hp_learner,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
learn_start_decree,
_app->last_durable_decree());
_private_log->get_learn_state(get_gpid(), learn_start_decree,
response.state);
response.type = learn_type::LT_LOG;
} else if (_private_log->get_learn_state(get_gpid(),
learn_start_decree, response.state)) {
- LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), choose to
learn private logs, "
+ LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn
private logs, "
"because mutation_log::get_learn_state() returns
true",
request.signature,
- hp_learner,
- request.learner);
+ FMT_HOST_PORT_AND_IP(request, learner));
response.type = learn_type::LT_LOG;
} else if (learn_start_decree < request.last_committed_decree_in_app +
1) {
- LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), choose to
learn private logs, "
+ LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn
private logs, "
"because learn_start_decree steps back for
duplication",
request.signature,
- hp_learner,
- request.learner);
+ FMT_HOST_PORT_AND_IP(request, learner));
response.type = learn_type::LT_LOG;
} else {
- LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), choose to
learn app, beacuse "
+ LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn
app, beacuse "
"learn_start_decree({}) <=
_app->last_durable_decree({}), and "
"mutation_log::get_learn_state() returns false",
request.signature,
- hp_learner,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
learn_start_decree,
_app->last_durable_decree());
response.type = learn_type::LT_APP;
@@ -523,11 +509,10 @@ void replica::on_learn(dsn::message_ex *msg, const
learn_request &request)
if (response.state.files.size() > 0) {
auto &last_file = response.state.files.back();
if (last_file == learner_state.last_learn_log_file) {
- LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}),
learn the same file {} "
+ LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, learn
the same file {} "
"repeatedly, hint to switch file",
request.signature,
- hp_learner,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
last_file);
_private_log->hint_switch_file();
} else {
@@ -536,12 +521,11 @@ void replica::on_learn(dsn::message_ex *msg, const
learn_request &request)
}
// it is safe to commit to last_committed_decree() now
response.state.to_decree_included = last_committed_decree();
- LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), learn
private logs succeed, "
+ LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, learn private
logs succeed, "
"learned_meta_size = {}, learned_file_count = {},
to_decree_included = "
"{}",
request.signature,
- hp_learner,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
response.state.meta.length(),
response.state.files.size(),
response.state.to_decree_included);
@@ -552,20 +536,18 @@ void replica::on_learn(dsn::message_ex *msg, const
learn_request &request)
if (err != ERR_OK) {
response.err = ERR_GET_LEARN_STATE_FAILED;
LOG_ERROR_PREFIX(
- "on_learn[{:#018x}]: learner = {}({}), get app checkpoint
failed, error = {}",
+ "on_learn[{:#018x}]: learner = {}, get app checkpoint
failed, error = {}",
request.signature,
- hp_learner,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
err);
} else {
response.base_local_dir = _app->data_dir();
response.__set_replica_disk_tag(_dir_node->tag);
LOG_INFO_PREFIX(
- "on_learn[{:#018x}]: learner = {}({}), get app learn state
succeed, "
+ "on_learn[{:#018x}]: learner = {}, get app learn state
succeed, "
"learned_meta_size = {}, learned_file_count = {},
learned_to_decree = {}",
request.signature,
- hp_learner,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
response.state.meta.length(),
response.state.files.size(),
response.state.to_decree_included);
@@ -981,7 +963,7 @@ bool replica::prepare_cached_learn_state(const
learn_request &request,
LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, set
prepare_start_decree = {}",
request.signature,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
local_committed_decree + 1);
}
@@ -1007,7 +989,7 @@ bool replica::prepare_cached_learn_state(const
learn_request &request,
"learn_start_decree = {}, prepare_start_decree = {},
learn_mutation_count "
"= {}, learn_data_size = {}",
request.signature,
- request.learner,
+ FMT_HOST_PORT_AND_IP(request, learner),
learn_start_decree,
response.prepare_start_decree,
count,
@@ -1302,8 +1284,7 @@ void replica::notify_learn_completion()
report.last_committed_decree_in_prepare_list = last_committed_decree();
report.learner_signature = _potential_secondary_states.learning_version;
report.learner_status_ = _potential_secondary_states.learning_status;
- report.node = _stub->primary_address();
- report.__set_hp_node(_stub->primary_host_port());
+ SET_IP_AND_HOST_PORT(report, node, _stub->primary_address(),
_stub->primary_host_port());
LOG_INFO_PREFIX("notify_learn_completion[{:#018x}]: learnee = {},
learn_duration = {} ms, "
"local_committed_decree = {}, app_committed_decree = {},
app_durable_decree = "
@@ -1344,41 +1325,35 @@ void replica::on_learn_completion_notification(const
group_check_response &repor
GET_HOST_PORT(report, node, hp_node);
LOG_INFO_PREFIX(
- "on_learn_completion_notification[{:#018x}]: learner = {}({}),
learning_status = {}",
+ "on_learn_completion_notification[{:#018x}]: learner = {},
learning_status = {}",
report.learner_signature,
- hp_node,
- report.node,
+ FMT_HOST_PORT_AND_IP(report, node),
enum_to_string(report.learner_status_));
if (status() != partition_status::PS_PRIMARY) {
response.err = (partition_status::PS_INACTIVE == status() &&
_inactive_is_transient)
? ERR_INACTIVE_STATE
: ERR_INVALID_STATE;
- LOG_ERROR_PREFIX(
- "on_learn_completion_notification[{:#018x}]: learner = {}({}),
this replica "
- "is not primary, but {}, reply {}",
- report.learner_signature,
- hp_node,
- report.node,
- enum_to_string(status()),
- response.err);
+ LOG_ERROR_PREFIX("on_learn_completion_notification[{:#018x}]: learner
= {}, this replica "
+ "is not primary, but {}, reply {}",
+ report.learner_signature,
+ FMT_HOST_PORT_AND_IP(report, node),
+ enum_to_string(status()),
+ response.err);
} else if (report.learner_status_ != learner_status::LearningSucceeded) {
response.err = ERR_INVALID_STATE;
- LOG_ERROR_PREFIX(
- "on_learn_completion_notification[{:#018x}]: learner = {}({}),
learner_status "
- "is not LearningSucceeded, but {}, reply ERR_INVALID_STATE",
- report.learner_signature,
- hp_node,
- report.node,
- enum_to_string(report.learner_status_));
+ LOG_ERROR_PREFIX("on_learn_completion_notification[{:#018x}]: learner
= {}, learner_status "
+ "is not LearningSucceeded, but {}, reply
ERR_INVALID_STATE",
+ report.learner_signature,
+ FMT_HOST_PORT_AND_IP(report, node),
+ enum_to_string(report.learner_status_));
} else {
response.err = handle_learning_succeeded_on_primary(hp_node,
report.learner_signature);
if (response.err != ERR_OK) {
- LOG_ERROR_PREFIX("on_learn_completion_notification[{:#018x}]:
learner = {}({}), handle "
+ LOG_ERROR_PREFIX("on_learn_completion_notification[{:#018x}]:
learner = {}, handle "
"learning succeeded on primary failed, reply {}",
report.learner_signature,
- hp_node,
- report.node,
+ FMT_HOST_PORT_AND_IP(report, node),
response.err);
}
}
diff --git a/src/replica/split/replica_split_manager.cpp
b/src/replica/split/replica_split_manager.cpp
index 955dbea23..9844c8ee6 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -1496,8 +1496,9 @@ void
replica_split_manager::primary_parent_handle_stop_split(
return;
}
- _replica->_primary_states.split_stopped_secondary.insert(
- req->__isset.hp_node ? req->hp_node :
host_port::from_address(req->node));
+ host_port secondary;
+ GET_HOST_PORT(*req, node, secondary);
+
_replica->_primary_states.split_stopped_secondary.emplace(std::move(secondary));
auto count = 0;
for (auto &iter : _replica->_primary_states.statuses) {
if (iter.second == partition_status::PS_SECONDARY &&
diff --git a/src/replica/split/test/replica_split_test.cpp
b/src/replica/split/test/replica_split_test.cpp
index d6f1ee5b4..61f801078 100644
--- a/src/replica/split/test/replica_split_test.cpp
+++ b/src/replica/split/test/replica_split_test.cpp
@@ -40,7 +40,6 @@
#include "replica/split/replica_split_manager.h"
#include "replica/test/mock_utils.h"
#include "replica/test/replica_test_base.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task.h"
@@ -396,8 +395,7 @@ public:
req.app = _parent_replica->_app_info;
req.config.ballot = INIT_BALLOT;
req.config.status = partition_status::PS_SECONDARY;
- req.node = SECONDARY_ADDR;
- req.__set_hp_node(SECONDARY);
+ SET_IP_AND_HOST_PORT_BY_DNS(req, node, SECONDARY);
if (meta_split_status == split_status::PAUSING ||
meta_split_status == split_status::CANCELING) {
req.__set_meta_split_status(meta_split_status);
@@ -429,8 +427,7 @@ public:
std::shared_ptr<group_check_request> req =
std::make_shared<group_check_request>();
std::shared_ptr<group_check_response> resp =
std::make_shared<group_check_response>();
- req->node = SECONDARY_ADDR;
- req->__set_hp_node(SECONDARY);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(*req, node, SECONDARY);
if (meta_split_status != split_status::NOT_SPLIT) {
req->__set_meta_split_status(meta_split_status);
}
@@ -532,9 +529,7 @@ public:
const host_port PRIMARY = host_port("localhost", 18230);
const host_port SECONDARY = host_port("localhost", 10058);
- const rpc_address SECONDARY_ADDR =
dsn::dns_resolver::instance().resolve_address(SECONDARY);
const host_port SECONDARY2 = host_port("localhost", 10805);
- const rpc_address SECONDARY_ADDR2 =
dsn::dns_resolver::instance().resolve_address(SECONDARY2);
const gpid PARENT_GPID = gpid(APP_ID, 1);
const gpid CHILD_GPID = gpid(APP_ID, 9);
const ballot INIT_BALLOT = 3;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]