This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 0e78b6cf0 refactor(FQDN): Update idl/metadata.thrift related code
(#1986)
0e78b6cf0 is described below
commit 0e78b6cf0691d247700fbdddd09ac38880b629e4
Author: Yingchun Lai <[email protected]>
AuthorDate: Fri Apr 26 17:22:19 2024 +0800
refactor(FQDN): Update idl/metadata.thrift related code (#1986)
---
src/common/replication_common.cpp | 4 +-
.../bulk_load/test/replica_bulk_loader_test.cpp | 8 +-
src/replica/replica_check.cpp | 5 +-
src/replica/replica_context.cpp | 3 +-
src/replica/replica_learn.cpp | 358 +++++++++------------
src/replica/replica_stub.cpp | 20 +-
src/replica/split/replica_split_manager.cpp | 73 +++--
7 files changed, 213 insertions(+), 258 deletions(-)
diff --git a/src/common/replication_common.cpp
b/src/common/replication_common.cpp
index e064e28da..9d0a8f103 100644
--- a/src/common/replication_common.cpp
+++ b/src/common/replication_common.cpp
@@ -34,6 +34,7 @@
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
#include "fmt/core.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
#include "runtime/rpc/rpc_address.h"
#include "runtime/service_app.h"
#include "utils/config_api.h"
@@ -170,10 +171,9 @@ int32_t
replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
/*out*/
replica_configuration &replica_config)
{
replica_config.pid = partition_config.pid;
- replica_config.primary = partition_config.primary;
replica_config.ballot = partition_config.ballot;
replica_config.learner_signature = invalid_signature;
- replica_config.__set_hp_primary(partition_config.hp_primary);
+ SET_OBJ_IP_AND_HOST_PORT(replica_config, primary, partition_config,
primary);
if (node == partition_config.hp_primary) {
replica_config.status = partition_status::PS_PRIMARY;
diff --git a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
index 9fb744bd6..6deeda9e4 100644
--- a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
+++ b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
@@ -220,7 +220,7 @@ public:
_group_req.meta_bulk_load_status = status;
_group_req.config.status = partition_status::PS_SECONDARY;
_group_req.config.ballot = b;
- SET_IP_AND_HOST_PORT(_group_req, target, PRIMARY, PRIMARY_HP);
+ SET_IP_AND_HOST_PORT_BY_DNS(_group_req, target, PRIMARY_HP);
}
void mock_replica_config(partition_status::type status)
@@ -228,8 +228,7 @@ public:
replica_configuration rconfig;
rconfig.ballot = BALLOT;
rconfig.pid = PID;
- rconfig.primary = PRIMARY;
- rconfig.__set_hp_primary(PRIMARY_HP);
+ SET_IP_AND_HOST_PORT_BY_DNS(rconfig, primary, PRIMARY_HP);
rconfig.status = status;
_replica->set_replica_config(rconfig);
}
@@ -412,9 +411,6 @@ public:
std::string ROOT_PATH = "bulk_load_root";
gpid PID = gpid(1, 0);
ballot BALLOT = 3;
- rpc_address PRIMARY = rpc_address::from_ip_port("127.0.0.2", 34801);
- rpc_address SECONDARY = rpc_address::from_ip_port("127.0.0.3", 34801);
- rpc_address SECONDARY2 = rpc_address::from_ip_port("127.0.0.4", 34801);
const host_port PRIMARY_HP = host_port("localhost", 34801);
const host_port SECONDARY_HP = host_port("localhost", 34801);
const host_port SECONDARY_HP2 = host_port("localhost", 34801);
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index cff270913..22148c2bd 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -181,10 +181,9 @@ void replica::on_group_check(const group_check_request
&request,
{
_checker.only_one_thread_access();
- LOG_INFO_PREFIX("process group check, primary = {}({}), ballot = {},
status = {}, "
+ LOG_INFO_PREFIX("process group check, primary = {}, ballot = {}, status =
{}, "
"last_committed_decree = {}, confirmed_decree = {}",
- request.config.hp_primary,
- request.config.primary,
+ FMT_HOST_PORT_AND_IP(request.config, primary),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree,
diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp
index 30cc14356..a1fda9314 100644
--- a/src/replica/replica_context.cpp
+++ b/src/replica/replica_context.cpp
@@ -124,8 +124,7 @@ void
primary_context::get_replica_config(partition_status::type st,
uint64_t learner_signature /*=
invalid_signature*/)
{
config.pid = membership.pid;
- config.primary = membership.primary;
- config.__set_hp_primary(membership.hp_primary);
+ SET_OBJ_IP_AND_HOST_PORT(config, primary, membership, primary);
config.ballot = membership.ballot;
config.status = st;
config.learner_signature = learner_signature;
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 9048e7a61..9594824cf 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -24,6 +24,7 @@
* THE SOFTWARE.
*/
+#include <fmt/std.h> // IWYU pragma: keep
#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
@@ -37,8 +38,6 @@
#include <utility>
#include <vector>
-#include <fmt/std.h> // IWYU pragma: keep
-
#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
@@ -57,6 +56,7 @@
#include "replica/replication_app_base.h"
#include "replica_stub.h"
#include "runtime/api_layer1.h"
+#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
@@ -214,12 +214,11 @@ void replica::init_learn(uint64_t signature)
if (_app->last_committed_decree() == 0 &&
_stub->_learn_app_concurrent_count.load() >=
FLAGS_learn_app_max_concurrent_count) {
LOG_WARNING_PREFIX(
- "init_learn[{:#018x}]: learnee = {}({}), learn_duration = {} ms,
need to learn app "
+ "init_learn[{:#018x}]: learnee = {}, learn_duration = {} ms, need
to learn app "
"because app_committed_decree = 0, but
learn_app_concurrent_count({}) >= "
"FLAGS_learn_app_max_concurrent_count({}), skip",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
_stub->_learn_app_concurrent_count,
FLAGS_learn_app_max_concurrent_count);
@@ -239,28 +238,28 @@ void replica::init_learn(uint64_t signature)
request.signature = _potential_secondary_states.learning_version;
_app->prepare_get_checkpoint(request.app_specific_learn_request);
- LOG_INFO_PREFIX(
- "init_learn[{:#018x}]: learnee = {}({}), learn_duration = {} ms,
max_gced_decree = "
- "{}, local_committed_decree = {}, app_committed_decree = {}, "
- "app_durable_decree = {}, current_learning_status = {},
total_copy_file_count "
- "= {}, total_copy_file_size = {}, total_copy_buffer_size = {}",
- request.signature,
- _config.hp_primary,
- _config.primary,
- _potential_secondary_states.duration_ms(),
- request.max_gced_decree,
- last_committed_decree(),
- _app->last_committed_decree(),
- _app->last_durable_decree(),
- enum_to_string(_potential_secondary_states.learning_status),
- _potential_secondary_states.learning_copy_file_count,
- _potential_secondary_states.learning_copy_file_size,
- _potential_secondary_states.learning_copy_buffer_size);
+ LOG_INFO_PREFIX("init_learn[{:#018x}]: learnee = {}, learn_duration = {}
ms, max_gced_decree = "
+ "{}, local_committed_decree = {}, app_committed_decree =
{}, "
+ "app_durable_decree = {}, current_learning_status = {},
total_copy_file_count "
+ "= {}, total_copy_file_size = {}, total_copy_buffer_size =
{}",
+ request.signature,
+ FMT_HOST_PORT_AND_IP(_config, primary),
+ _potential_secondary_states.duration_ms(),
+ request.max_gced_decree,
+ last_committed_decree(),
+ _app->last_committed_decree(),
+ _app->last_durable_decree(),
+
enum_to_string(_potential_secondary_states.learning_status),
+ _potential_secondary_states.learning_copy_file_count,
+ _potential_secondary_states.learning_copy_file_size,
+ _potential_secondary_states.learning_copy_buffer_size);
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_LEARN, 0,
get_gpid().thread_hash());
dsn::marshall(msg, request);
+ host_port primary;
+ GET_HOST_PORT(_config, primary, primary);
_potential_secondary_states.learning_task = rpc::call(
- _config.primary,
+ dsn::dns_resolver::instance().resolve_address(primary),
msg,
&_tracker,
[ this, req_cap = std::move(request) ](error_code err, learn_response
&& resp) mutable {
@@ -599,14 +598,13 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
}
LOG_INFO_PREFIX(
- "on_learn_reply_start[{}]: learnee = {}({}), learn_duration ={} ms,
response_err = "
+ "on_learn_reply_start[{}]: learnee = {}, learn_duration ={} ms,
response_err = "
"{}, remote_committed_decree = {}, prepare_start_decree = {},
learn_type = {} "
"learned_buffer_size = {}, learned_file_count = {},to_decree_included
= "
"{}, learn_start_decree = {}, last_commit_decree = {},
current_learning_status = "
"{} ",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
_potential_secondary_states.duration_ms(),
resp.err,
resp.last_committed_decree,
@@ -624,12 +622,11 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
if (resp.err != ERR_OK) {
if (resp.err == ERR_INACTIVE_STATE || resp.err ==
ERR_INCONSISTENT_STATE) {
- LOG_WARNING_PREFIX("on_learn_reply[{:#018x}]: learnee = {}({}),
learnee is updating "
+ LOG_WARNING_PREFIX("on_learn_reply[{:#018x}]: learnee = {},
learnee is updating "
"ballot(inactive state) or
reconciliation(inconsistent state), "
"delay to start another round of learning",
req.signature,
- resp.config.hp_primary,
- resp.config.primary);
+ FMT_HOST_PORT_AND_IP(resp.config, primary));
_potential_secondary_states.learning_round_is_running = false;
_potential_secondary_states.delay_learning_task =
tasking::create_task(LPC_DELAY_LEARN,
@@ -644,34 +641,30 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
}
if (resp.config.ballot > get_ballot()) {
- LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}({}), update
configuration because "
+ LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, update
configuration because "
"ballot have changed",
req.signature,
- resp.config.hp_primary,
- resp.config.primary);
+ FMT_HOST_PORT_AND_IP(resp.config, primary));
CHECK(update_local_configuration(resp.config), "");
}
if (status() != partition_status::PS_POTENTIAL_SECONDARY) {
LOG_ERROR_PREFIX(
- "on_learn_reply[{:#018x}]: learnee = {}({}), current_status = {},
stop learning",
+ "on_learn_reply[{:#018x}]: learnee = {}, current_status = {}, stop
learning",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
enum_to_string(status()));
return;
}
// local state is newer than learnee
if (resp.last_committed_decree < _app->last_committed_decree()) {
- LOG_WARNING_PREFIX(
- "on_learn_reply[{:#018x}]: learnee = {}({}), learner state is
newer than "
- "learnee (primary): {} vs {}, create new app",
- req.signature,
- resp.config.hp_primary,
- resp.config.primary,
- _app->last_committed_decree(),
- resp.last_committed_decree);
+ LOG_WARNING_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, learner
state is newer than "
+ "learnee (primary): {} vs {}, create new app",
+ req.signature,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
+ _app->last_committed_decree(),
+ resp.last_committed_decree);
METRIC_VAR_INCREMENT(learn_resets);
@@ -679,11 +672,10 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
auto err = _app->close(true);
if (err != ERR_OK) {
LOG_ERROR_PREFIX(
- "on_learn_reply[{:#018x}]: learnee = {}({}), close app (with
clear_state=true) "
+ "on_learn_reply[{:#018x}]: learnee = {}), close app (with
clear_state=true) "
"failed, err = {}",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
err);
}
@@ -708,11 +700,10 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
err = _app->open_new_internal(this,
_private_log->on_partition_reset(get_gpid(), 0));
if (err != ERR_OK) {
- LOG_ERROR_PREFIX("on_learn_reply[{:#018x}]: learnee = {}({}),
open app (with "
+ LOG_ERROR_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, open
app (with "
"create_new=true) failed, err = {}",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
err);
}
}
@@ -746,11 +737,10 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
if (++_stub->_learn_app_concurrent_count >
FLAGS_learn_app_max_concurrent_count) {
--_stub->_learn_app_concurrent_count;
LOG_WARNING_PREFIX(
- "on_learn_reply[{:#018x}]: learnee = {}({}),
learn_app_concurrent_count({}) >= "
+ "on_learn_reply[{:#018x}]: learnee = {},
learn_app_concurrent_count({}) >= "
"FLAGS_learn_app_max_concurrent_count({}), skip this round",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_stub->_learn_app_concurrent_count,
FLAGS_learn_app_max_concurrent_count);
_potential_secondary_states.learning_round_is_running = false;
@@ -758,10 +748,9 @@ void replica::on_learn_reply(error_code err, learn_request
&&req, learn_response
} else {
_potential_secondary_states.learn_app_concurrent_count_increased =
true;
LOG_INFO_PREFIX(
- "on_learn_reply[{:#018x}]: learnee = {}({}),
++learn_app_concurrent_count = {}",
+ "on_learn_reply[{:#018x}]: learnee = {},
++learn_app_concurrent_count = {}",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_stub->_learn_app_concurrent_count.load());
}
}
@@ -805,11 +794,10 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
// reset preparelist
_potential_secondary_states.learning_start_prepare_decree =
resp.prepare_start_decree;
_prepare_list->truncate(_app->last_committed_decree());
- LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}({}), truncate
prepare list, "
+ LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}, truncate
prepare list, "
"local_committed_decree = {}, current_learning_status
= {}",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
_app->last_committed_decree(),
enum_to_string(_potential_secondary_states.learning_status));
@@ -837,14 +825,12 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
_prepare_list->get_mutation_by_decree(mu->data.header.decree);
if (existing_mutation != nullptr &&
existing_mutation->data.header.ballot >
mu->data.header.ballot) {
- LOG_INFO_PREFIX(
- "on_learn_reply[{:#018x}]: learnee = {}({}),
mutation({}) exist on "
- "the learner with larger ballot {}",
- req.signature,
- resp.config.hp_primary,
- resp.config.primary,
- mu->name(),
- existing_mutation->data.header.ballot);
+ LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {},
mutation({}) exist on "
+ "the learner with larger ballot {}",
+ req.signature,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
+ mu->name(),
+ existing_mutation->data.header.ballot);
} else {
_prepare_list->prepare(mu,
partition_status::PS_POTENTIAL_SECONDARY);
}
@@ -856,12 +842,11 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
}
}
- LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {}({}),
learn_duration = {} ms, apply "
+ LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {},
learn_duration = {} ms, apply "
"cache done, prepare_cache_range = <{}, {}>,
local_committed_decree = {}, "
"app_committed_decree = {}, current_learning_status =
{}",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
_potential_secondary_states.duration_ms(),
cache_range.first,
cache_range.second,
@@ -903,10 +888,9 @@ void replica::on_learn_reply(error_code err, learn_request
&&req, learn_response
if (!dsn::utils::filesystem::directory_exists(learn_dir)) {
LOG_ERROR_PREFIX(
- "on_learn_reply[{:#018x}]: learnee = {}({}), create replica
learn dir {} failed",
+ "on_learn_reply[{:#018x}]: learnee = {}, create replica learn
dir {} failed",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
learn_dir);
_potential_secondary_states.learn_remote_files_task =
@@ -927,18 +911,18 @@ void replica::on_learn_reply(error_code err,
learn_request &&req, learn_response
}
bool high_priority = (resp.type == learn_type::LT_APP ? false : true);
- LOG_INFO_PREFIX(
- "on_learn_reply[{:#018x}]: learnee = {}({}), learn_duration = {}
ms, start to "
- "copy remote files, copy_file_count = {}, priority = {}",
- req.signature,
- resp.config.hp_primary,
- resp.config.primary,
- _potential_secondary_states.duration_ms(),
- resp.state.files.size(),
- high_priority ? "high" : "low");
+ LOG_INFO_PREFIX("on_learn_reply[{:#018x}]: learnee = {},
learn_duration = {} ms, start to "
+ "copy remote files, copy_file_count = {}, priority =
{}",
+ req.signature,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
+ _potential_secondary_states.duration_ms(),
+ resp.state.files.size(),
+ high_priority ? "high" : "low");
+ host_port primary;
+ GET_HOST_PORT(resp.config, primary, primary);
_potential_secondary_states.learn_remote_files_task =
_stub->_nfs->copy_remote_files(
- resp.config.hp_primary,
+ primary,
resp.replica_disk_tag,
resp.base_local_dir,
resp.state.files,
@@ -1044,34 +1028,31 @@ void replica::on_copy_remote_state_completed(error_code
err,
decree old_app_committed = _app->last_committed_decree();
decree old_app_durable = _app->last_durable_decree();
- LOG_INFO_PREFIX(
- "on_copy_remote_state_completed[{:#018x}]: learnee = {}({}),
learn_duration = {} "
- "ms, copy remote state done, err = {}, copy_file_count = {},
copy_file_size = "
- "{}, copy_time_used = {} ms, local_committed_decree = {},
app_committed_decree "
- "= {}, app_durable_decree = {}, prepare_start_decree = {}, "
- "current_learning_status = {}",
- req.signature,
- resp.config.hp_primary,
- resp.config.primary,
- _potential_secondary_states.duration_ms(),
- err,
- resp.state.files.size(),
- size,
- _potential_secondary_states.duration_ms() - copy_start_time,
- last_committed_decree(),
- _app->last_committed_decree(),
- _app->last_durable_decree(),
- resp.prepare_start_decree,
- enum_to_string(_potential_secondary_states.learning_status));
+ LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee = {},
learn_duration = {} "
+ "ms, copy remote state done, err = {}, copy_file_count =
{}, copy_file_size = "
+ "{}, copy_time_used = {} ms, local_committed_decree = {},
app_committed_decree "
+ "= {}, app_durable_decree = {}, prepare_start_decree = {},
"
+ "current_learning_status = {}",
+ req.signature,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
+ _potential_secondary_states.duration_ms(),
+ err,
+ resp.state.files.size(),
+ size,
+ _potential_secondary_states.duration_ms() -
copy_start_time,
+ last_committed_decree(),
+ _app->last_committed_decree(),
+ _app->last_durable_decree(),
+ resp.prepare_start_decree,
+
enum_to_string(_potential_secondary_states.learning_status));
if (resp.type == learn_type::LT_APP) {
--_stub->_learn_app_concurrent_count;
_potential_secondary_states.learn_app_concurrent_count_increased =
false;
- LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee =
{}({}), "
+ LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee =
{}, "
"--learn_app_concurrent_count = {}",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_stub->_learn_app_concurrent_count.load());
}
@@ -1115,22 +1096,20 @@ void replica::on_copy_remote_state_completed(error_code
err,
// the learn_start_decree will be set to 0, which makes
learner to learn from
// scratch
CHECK_LE(_app->last_committed_decree(),
resp.last_committed_decree);
- LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]:
learnee = {}({}), "
+ LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]:
learnee = {}, "
"learn_duration = {} ms, checkpoint duration =
{} ns, apply "
"checkpoint succeed, app_committed_decree =
{}",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
_potential_secondary_states.duration_ms(),
dsn_now_ns() - start_ts,
_app->last_committed_decree());
} else {
- LOG_ERROR_PREFIX("on_copy_remote_state_completed[{:#018x}]:
learnee = {}({}), "
+ LOG_ERROR_PREFIX("on_copy_remote_state_completed[{:#018x}]:
learnee = {}, "
"learn_duration = {} ms, checkpoint duration
= {} ns, apply "
"checkpoint failed, err = {}",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
_potential_secondary_states.duration_ms(),
dsn_now_ns() - start_ts,
err);
@@ -1142,22 +1121,20 @@ void replica::on_copy_remote_state_completed(error_code
err,
auto start_ts = dsn_now_ns();
err = apply_learned_state_from_private_log(lstate);
if (err == ERR_OK) {
- LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]:
learnee = {}({}), "
+ LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]:
learnee = {}, "
"learn_duration = {} ms, apply_log_duration =
{} ns, apply learned "
"state from private log succeed,
app_committed_decree = {}",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
_potential_secondary_states.duration_ms(),
dsn_now_ns() - start_ts,
_app->last_committed_decree());
} else {
- LOG_ERROR_PREFIX("on_copy_remote_state_completed[{:#018x}]:
learnee = {}({}), "
+ LOG_ERROR_PREFIX("on_copy_remote_state_completed[{:#018x}]:
learnee = {}, "
"learn_duration = {} ms, apply_log_duration =
{} ns, apply "
"learned state from private log failed, err =
{}",
req.signature,
- resp.config.hp_primary,
- resp.config.primary,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
_potential_secondary_states.duration_ms(),
dsn_now_ns() - start_ts,
err);
@@ -1167,28 +1144,26 @@ void replica::on_copy_remote_state_completed(error_code
err,
// reset prepare list to make it catch with app
_prepare_list->reset(_app->last_committed_decree());
- LOG_INFO_PREFIX(
- "on_copy_remote_state_completed[{:#018x}]: learnee = {}({}),
learn_duration = "
- "{} ms, apply checkpoint/log done, err = {}, last_prepared_decree
= ({} => "
- "{}), last_committed_decree = ({} => {}), app_committed_decree =
({} => "
- "{}), app_durable_decree = ({} => {}), remote_committed_decree =
{}, "
- "prepare_start_decree = {}, current_learning_status = {}",
- req.signature,
- resp.config.hp_primary,
- resp.config.primary,
- _potential_secondary_states.duration_ms(),
- err,
- old_prepared,
- last_prepared_decree(),
- old_committed,
- last_committed_decree(),
- old_app_committed,
- _app->last_committed_decree(),
- old_app_durable,
- _app->last_durable_decree(),
- resp.last_committed_decree,
- resp.prepare_start_decree,
- enum_to_string(_potential_secondary_states.learning_status));
+ LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee =
{}, learn_duration = "
+ "{} ms, apply checkpoint/log done, err = {},
last_prepared_decree = ({} => "
+ "{}), last_committed_decree = ({} => {}),
app_committed_decree = ({} => "
+ "{}), app_durable_decree = ({} => {}),
remote_committed_decree = {}, "
+ "prepare_start_decree = {}, current_learning_status =
{}",
+ req.signature,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
+ _potential_secondary_states.duration_ms(),
+ err,
+ old_prepared,
+ last_prepared_decree(),
+ old_committed,
+ last_committed_decree(),
+ old_app_committed,
+ _app->last_committed_decree(),
+ old_app_durable,
+ _app->last_durable_decree(),
+ resp.last_committed_decree,
+ resp.prepare_start_decree,
+
enum_to_string(_potential_secondary_states.learning_status));
}
// if catch-up done, do flush to enable all learned state is durable
@@ -1198,17 +1173,15 @@ void replica::on_copy_remote_state_completed(error_code
err,
_app->last_committed_decree() > _app->last_durable_decree()) {
err = background_sync_checkpoint();
- LOG_INFO_PREFIX(
- "on_copy_remote_state_completed[{:#018x}]: learnee = {}({}),
learn_duration = "
- "{} ms, flush done, err = {}, app_committed_decree = {}, "
- "app_durable_decree = {}",
- req.signature,
- resp.config.hp_primary,
- resp.config.primary,
- _potential_secondary_states.duration_ms(),
- err,
- _app->last_committed_decree(),
- _app->last_durable_decree());
+ LOG_INFO_PREFIX("on_copy_remote_state_completed[{:#018x}]: learnee =
{}, learn_duration = "
+ "{} ms, flush done, err = {}, app_committed_decree =
{}, "
+ "app_durable_decree = {}",
+ req.signature,
+ FMT_HOST_PORT_AND_IP(resp.config, primary),
+ _potential_secondary_states.duration_ms(),
+ err,
+ _app->last_committed_decree(),
+ _app->last_durable_decree());
if (err == ERR_OK) {
CHECK_EQ(_app->last_committed_decree(),
_app->last_durable_decree());
@@ -1235,31 +1208,28 @@ void
replica::on_learn_remote_state_completed(error_code err)
_checker.only_one_thread_access();
if (partition_status::PS_POTENTIAL_SECONDARY != status()) {
- LOG_WARNING_PREFIX("on_learn_remote_state_completed[{:#018x}]: learnee
= {}({}), "
+ LOG_WARNING_PREFIX("on_learn_remote_state_completed[{:#018x}]: learnee
= {}, "
"learn_duration = {} ms, err = {}, the learner
status is not "
"PS_POTENTIAL_SECONDARY, but {}, ignore",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
err,
enum_to_string(status()));
return;
}
- LOG_INFO_PREFIX(
- "on_learn_remote_state_completed[{:#018x}]: learnee = {}({}),
learn_duration = {} "
- "ms, err = {}, local_committed_decree = {}, app_committed_decree = {},
"
- "app_durable_decree = {}, current_learning_status = {}",
- _potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
- _potential_secondary_states.duration_ms(),
- err,
- last_committed_decree(),
- _app->last_committed_decree(),
- _app->last_durable_decree(),
- enum_to_string(_potential_secondary_states.learning_status));
+ LOG_INFO_PREFIX("on_learn_remote_state_completed[{:#018x}]: learnee = {},
learn_duration = {} "
+ "ms, err = {}, local_committed_decree = {},
app_committed_decree = {}, "
+ "app_durable_decree = {}, current_learning_status = {}",
+ _potential_secondary_states.learning_version,
+ FMT_HOST_PORT_AND_IP(_config, primary),
+ _potential_secondary_states.duration_ms(),
+ err,
+ last_committed_decree(),
+ _app->last_committed_decree(),
+ _app->last_durable_decree(),
+
enum_to_string(_potential_secondary_states.learning_status));
_potential_secondary_states.learning_round_is_running = false;
@@ -1276,10 +1246,9 @@ void replica::handle_learning_error(error_code err, bool
is_local_error)
_checker.only_one_thread_access();
LOG_ERROR_PREFIX(
- "handle_learning_error[{:#018x}]: learnee = {}({}), learn_duration =
{} ms, err = {}, {}",
+ "handle_learning_error[{:#018x}]: learnee = {}, learn_duration = {}
ms, err = {}, {}",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
err,
is_local_error ? "local_error" : "remote error");
@@ -1336,12 +1305,11 @@ void replica::notify_learn_completion()
report.node = _stub->primary_address();
report.__set_hp_node(_stub->primary_host_port());
- LOG_INFO_PREFIX("notify_learn_completion[{:#018x}]: learnee = {}({}),
learn_duration = {} ms, "
+ LOG_INFO_PREFIX("notify_learn_completion[{:#018x}]: learnee = {},
learn_duration = {} ms, "
"local_committed_decree = {}, app_committed_decree = {},
app_durable_decree = "
"{}, current_learning_status = {}",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
last_committed_decree(),
_app->last_committed_decree(),
@@ -1356,8 +1324,10 @@ void replica::notify_learn_completion()
dsn::message_ex::create_request(RPC_LEARN_COMPLETION_NOTIFY, 0,
get_gpid().thread_hash());
dsn::marshall(msg, report);
+ host_port primary;
+ GET_HOST_PORT(_config, primary, primary);
_potential_secondary_states.completion_notify_task =
- rpc::call(_config.primary, msg, &_tracker, [
+ rpc::call(dsn::dns_resolver::instance().resolve_address(primary), msg,
&_tracker, [
this,
report = std::move(report)
](error_code err, learn_notify_response && resp) mutable {
@@ -1430,36 +1400,32 @@ void
replica::on_learn_completion_notification_reply(error_code err,
}
if (resp.signature !=
(int64_t)_potential_secondary_states.learning_version) {
- LOG_ERROR_PREFIX("on_learn_completion_notification_reply[{:#018x}]:
learnee = {}({}), "
+ LOG_ERROR_PREFIX("on_learn_completion_notification_reply[{:#018x}]:
learnee = {}, "
"learn_duration = {} ms, signature not matched,
current signature on "
"primary is [{:#018x}]",
report.learner_signature,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
resp.signature);
handle_learning_error(ERR_INVALID_STATE, false);
return;
}
- LOG_INFO_PREFIX("on_learn_completion_notification_reply[{:#018x}]: learnee
= {}({}), "
+ LOG_INFO_PREFIX("on_learn_completion_notification_reply[{:#018x}]: learnee
= {}, "
"learn_duration = {} ms, response_err = {}",
report.learner_signature,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
resp.err);
if (resp.err != ERR_OK) {
if (resp.err == ERR_INACTIVE_STATE) {
- LOG_WARNING_PREFIX(
- "on_learn_completion_notification_reply[{:#018x}]: learnee =
{}({}), "
- "learn_duration = {} ms, learnee is updating ballot, delay to
start "
- "another round of learning",
- report.learner_signature,
- _config.hp_primary,
- _config.primary,
- _potential_secondary_states.duration_ms());
+
LOG_WARNING_PREFIX("on_learn_completion_notification_reply[{:#018x}]: learnee =
{}, "
+ "learn_duration = {} ms, learnee is updating
ballot, delay to start "
+ "another round of learning",
+ report.learner_signature,
+ FMT_HOST_PORT_AND_IP(_config, primary),
+ _potential_secondary_states.duration_ms());
_potential_secondary_states.learning_round_is_running = false;
_potential_secondary_states.delay_learning_task =
tasking::create_task(
LPC_DELAY_LEARN,
@@ -1477,10 +1443,9 @@ void
replica::on_learn_completion_notification_reply(error_code err,
void replica::on_add_learner(const group_check_request &request)
{
- LOG_INFO_PREFIX("process add learner, primary = {}({}), ballot ={}, status
={}, "
+ LOG_INFO_PREFIX("process add learner, primary = {}, ballot ={}, status
={}, "
"last_committed_decree = {}, duplicating = {}",
- request.config.hp_primary,
- request.config.primary,
+ FMT_HOST_PORT_AND_IP(request.config, primary),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree,
@@ -1620,14 +1585,13 @@ error_code
replica::apply_learned_state_from_private_log(learn_state &state)
LOG_INFO_PREFIX(
"apply_learned_state_from_private_log[{}]: duplicating={},
step_back={}, "
- "learnee = {}({}), learn_duration = {} ms, apply private log files
done, file_count "
+ "learnee = {}, learn_duration = {} ms, apply private log files done,
file_count "
"={}, first_learn_start_decree ={}, learn_start_decree = {}, "
"app_committed_decree = {}",
_potential_secondary_states.learning_version,
duplicating,
step_back,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
state.files.size(),
_potential_secondary_states.first_learn_start_decree,
@@ -1654,23 +1618,21 @@ error_code
replica::apply_learned_state_from_private_log(learn_state &state)
}
if (state.to_decree_included > last_committed_decree()) {
- LOG_INFO_PREFIX("apply_learned_state_from_private_log[{}]: learnee
={}({}), "
+ LOG_INFO_PREFIX("apply_learned_state_from_private_log[{}]: learnee
={}, "
"learned_to_decree_included({}) >
last_committed_decree({}), commit to "
"to_decree_included",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
state.to_decree_included,
last_committed_decree());
plist.commit(state.to_decree_included, COMMIT_TO_DECREE_SOFT);
}
- LOG_INFO_PREFIX(" apply_learned_state_from_private_log[{}]: learnee
={}({}), "
+ LOG_INFO_PREFIX(" apply_learned_state_from_private_log[{}]: learnee
={}, "
"learn_duration ={} ms, apply in-buffer private logs
done, "
"replay_count ={}, app_committed_decree = {}",
_potential_secondary_states.learning_version,
- _config.hp_primary,
- _config.primary,
+ FMT_HOST_PORT_AND_IP(_config, primary),
_potential_secondary_states.duration_ms(),
replay_count,
_app->last_committed_decree());
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index fc36bd98a..286364ac5 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1101,12 +1101,11 @@ void replica_stub::on_group_check(group_check_rpc rpc)
return;
}
- LOG_INFO("{}@{}: received group check, primary = {}({}), ballot = {},
status = {}, "
+ LOG_INFO("{}@{}: received group check, primary = {}, ballot = {}, status =
{}, "
"last_committed_decree = {}",
request.config.pid,
_primary_host_port_cache,
- request.config.hp_primary,
- request.config.primary,
+ FMT_HOST_PORT_AND_IP(request.config, primary),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree);
@@ -1165,20 +1164,18 @@ void
replica_stub::on_learn_completion_notification(learn_completion_notificatio
void replica_stub::on_add_learner(const group_check_request &request)
{
if (!is_connected()) {
- LOG_WARNING("{}@{}: received add learner, primary = {}({}), not
connected, ignore",
+ LOG_WARNING("{}@{}: received add learner, primary = {}, not connected,
ignore",
request.config.pid,
_primary_host_port_cache,
- request.config.hp_primary,
- request.config.primary);
+ FMT_HOST_PORT_AND_IP(request.config, primary));
return;
}
- LOG_INFO("{}@{}: received add learner, primary = {}({}), ballot = {},
status = {}, "
+ LOG_INFO("{}@{}: received add learner, primary = {}, ballot = {}, status =
{}, "
"last_committed_decree = {}",
request.config.pid,
_primary_host_port_cache,
- request.config.hp_primary,
- request.config.primary,
+ FMT_HOST_PORT_AND_IP(request.config, primary),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree);
@@ -2743,12 +2740,11 @@ void
replica_stub::on_group_bulk_load(group_bulk_load_rpc rpc)
const group_bulk_load_request &request = rpc.request();
group_bulk_load_response &response = rpc.response();
- LOG_INFO("[{}@{}]: received group bulk load request, primary = {}({}),
ballot = {}, "
+ LOG_INFO("[{}@{}]: received group bulk load request, primary = {}, ballot
= {}, "
"meta_bulk_load_status = {}",
request.config.pid,
_primary_host_port_cache,
- request.config.hp_primary,
- request.config.primary,
+ FMT_HOST_PORT_AND_IP(request.config, primary),
request.config.ballot,
enum_to_string(request.meta_bulk_load_status));
diff --git a/src/replica/split/replica_split_manager.cpp
b/src/replica/split/replica_split_manager.cpp
index 58dee52dc..2fe476468 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -146,18 +146,19 @@ void replica_split_manager::parent_start_split(
_child_gpid = child_gpid;
_child_init_ballot = get_ballot();
- LOG_INFO_PREFIX("start to add child({}), init_ballot={}, status={},
primary={}({})",
+ LOG_INFO_PREFIX("start to add child({}), init_ballot={}, status={},
primary={}",
_child_gpid,
_child_init_ballot,
enum_to_string(status()),
- request.config.hp_primary,
- request.config.primary);
+ FMT_HOST_PORT_AND_IP(request.config, primary));
+ host_port primary;
+ GET_HOST_PORT(_replica->_config, primary, primary);
tasking::enqueue(LPC_CREATE_CHILD,
tracker(),
std::bind(&replica_stub::create_child_replica,
_stub,
- _replica->_config.hp_primary,
+ primary,
_replica->_app_info,
_child_init_ballot,
_child_gpid,
@@ -184,8 +185,7 @@ void replica_split_manager::child_init_replica(gpid
parent_gpid,
// update replica config
_replica->_config.ballot = init_ballot;
- _replica->_config.primary =
dsn::dns_resolver::instance().resolve_address(primary_host_port);
- _replica->_config.__set_hp_primary(primary_host_port);
+ SET_IP_AND_HOST_PORT_BY_DNS(_replica->_config, primary, primary_host_port);
_replica->_config.status = partition_status::PS_PARTITION_SPLIT;
// initialize split context
@@ -621,10 +621,9 @@ void replica_split_manager::child_notify_catch_up() // on
child partition
request->child = _stub->primary_address();
request->__set_hp_child(_stub->primary_host_port());
- LOG_INFO_PREFIX("send notification to primary parent[{}@{}({})],
ballot={}",
+ LOG_INFO_PREFIX("send notification to primary parent[{}@{}], ballot={}",
_replica->_split_states.parent_gpid,
- _replica->_config.hp_primary,
- _replica->_config.primary,
+ FMT_HOST_PORT_AND_IP(_replica->_config, primary),
get_ballot());
notify_catch_up_rpc rpc(std::move(request),
@@ -632,32 +631,36 @@ void replica_split_manager::child_notify_catch_up() // on
child partition
/*never timeout*/ 0_ms,
/*partition_hash*/ 0,
_replica->_split_states.parent_gpid.thread_hash());
- rpc.call(_replica->_config.primary, tracker(), [this, rpc](error_code ec)
mutable {
- auto response = rpc.response();
- if (ec == ERR_TIMEOUT) {
- LOG_WARNING_PREFIX("notify primary catch up timeout, please wait
and retry");
- tasking::enqueue(LPC_PARTITION_SPLIT,
- tracker(),
-
std::bind(&replica_split_manager::child_notify_catch_up, this),
- get_gpid().thread_hash(),
- std::chrono::seconds(1));
- return;
- }
- if (ec != ERR_OK || response.err != ERR_OK) {
- error_code err = (ec == ERR_OK) ? response.err : ec;
- LOG_ERROR_PREFIX("failed to notify primary catch up, error={}",
err);
- _stub->split_replica_error_handler(
- _replica->_split_states.parent_gpid,
- std::bind(&replica_split_manager::parent_cleanup_split_context,
- std::placeholders::_1));
- child_handle_split_error("notify_primary_split_catch_up failed");
- return;
- }
- LOG_INFO_PREFIX("notify primary parent[{}@{}({})] catch up succeed",
- _replica->_split_states.parent_gpid,
- _replica->_config.hp_primary,
- _replica->_config.primary);
- });
+ host_port primary;
+ GET_HOST_PORT(_replica->_config, primary, primary);
+ rpc.call(dsn::dns_resolver::instance().resolve_address(primary),
+ tracker(),
+ [this, rpc](error_code ec) mutable {
+ auto response = rpc.response();
+ if (ec == ERR_TIMEOUT) {
+ LOG_WARNING_PREFIX("notify primary catch up timeout,
please wait and retry");
+ tasking::enqueue(
+ LPC_PARTITION_SPLIT,
+ tracker(),
+
std::bind(&replica_split_manager::child_notify_catch_up, this),
+ get_gpid().thread_hash(),
+ std::chrono::seconds(1));
+ return;
+ }
+ if (ec != ERR_OK || response.err != ERR_OK) {
+ error_code err = (ec == ERR_OK) ? response.err : ec;
+ LOG_ERROR_PREFIX("failed to notify primary catch up,
error={}", err);
+ _stub->split_replica_error_handler(
+ _replica->_split_states.parent_gpid,
+
std::bind(&replica_split_manager::parent_cleanup_split_context,
+ std::placeholders::_1));
+ child_handle_split_error("notify_primary_split_catch_up
failed");
+ return;
+ }
+ LOG_INFO_PREFIX("notify primary parent[{}@{}] catch up
succeed",
+ _replica->_split_states.parent_gpid,
+ FMT_HOST_PORT_AND_IP(_replica->_config,
primary));
+ });
}
// ThreadPool: THREAD_POOL_REPLICATION
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]