This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new ce267cbd2 refactor(FQDN): further more refator on
idl/dsn.layer2.thrift v2 (#2217)
ce267cbd2 is described below
commit ce267cbd2ea480a09345eb358143dcbc79b43760
Author: Samunroyu <[email protected]>
AuthorDate: Wed May 21 23:48:20 2025 +0800
refactor(FQDN): further more refator on idl/dsn.layer2.thrift v2 (#2217)
This is a further refactor on idl/dsn.layer2.thrift based on
https://github.com/apache/incubator-pegasus/pull/2049
the main effects are on partition_configuration structure.
---
src/meta/backup_engine.cpp | 2 +-
src/meta/duplication/meta_duplication_service.cpp | 11 +++++++----
src/meta/greedy_load_balancer.cpp | 9 +++++++--
src/meta/meta_backup_service.cpp | 2 +-
src/meta/meta_bulk_load_service.cpp | 3 ++-
src/meta/server_state_restore.cpp | 10 +++++++++-
src/replica/bulk_load/replica_bulk_loader.cpp | 16 ++++++++++++----
src/replica/replica_context.cpp | 16 ++++++++++++----
src/replica/replica_stub.cpp | 8 ++++++--
9 files changed, 57 insertions(+), 20 deletions(-)
diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp
index 7197f4e48..be5347a2d 100644
--- a/src/meta/backup_engine.cpp
+++ b/src/meta/backup_engine.cpp
@@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
_is_backup_failed = true;
return;
}
- partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
+ GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary,
partition_primary);
}
if (!partition_primary) {
diff --git a/src/meta/duplication/meta_duplication_service.cpp
b/src/meta/duplication/meta_duplication_service.cpp
index f1d61a9db..971a4beb4 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -764,7 +764,9 @@ void
meta_duplication_service::check_follower_app_if_create_completed(
query_err = ERR_INCONSISTENT_STATE;
} else {
for (const auto &pc : resp.partitions) {
- if (!pc.hp_primary) {
+ host_port primary;
+ GET_HOST_PORT(pc, primary, primary);
+ if (!primary) {
// Fail once the primary replica is unavailable.
query_err = ERR_INACTIVE_STATE;
break;
@@ -772,13 +774,14 @@ void
meta_duplication_service::check_follower_app_if_create_completed(
// Once replica count is more than 1, at least one
secondary replica
// is required.
- if (1 + pc.hp_secondaries.size() <
pc.max_replica_count &&
- pc.hp_secondaries.empty()) {
+ std::vector<host_port> secondaries;
+ GET_HOST_PORTS(pc, secondaries, secondaries);
+ if (1 + secondaries.size() < pc.max_replica_count &&
secondaries.empty()) {
query_err = ERR_NOT_ENOUGH_MEMBER;
break;
}
- for (const auto &secondary : pc.hp_secondaries) {
+ for (const auto &secondary : secondaries) {
if (!secondary) {
// Fail once any secondary replica is
unavailable.
query_err = ERR_INACTIVE_STATE;
diff --git a/src/meta/greedy_load_balancer.cpp
b/src/meta/greedy_load_balancer.cpp
index 0b3017eda..2e3304358 100644
--- a/src/meta/greedy_load_balancer.cpp
+++ b/src/meta/greedy_load_balancer.cpp
@@ -147,8 +147,13 @@ bool
greedy_load_balancer::all_replica_infos_collected(const node_state &ns)
{
const auto &n = ns.host_port();
return ns.for_each_partition([this, n](const dsn::gpid &pid) {
- config_context &cc = *get_config_context(*(t_global_view->apps), pid);
- if (cc.find_from_serving(n) == cc.serving.end()) {
+ config_context *ctx = get_config_context(*(t_global_view->apps), pid);
+ if (ctx == nullptr) {
+ LOG_INFO("get_config_context return nullptr for gpid({})", pid);
+ return false;
+ }
+
+ if (ctx->find_from_serving(n) == ctx->serving.end()) {
LOG_INFO("meta server hasn't collected gpid({})'s info of {}",
pid, n);
return false;
}
diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp
index 5dd49a6c0..10b641916 100644
--- a/src/meta/meta_backup_service.cpp
+++ b/src/meta/meta_backup_service.cpp
@@ -532,7 +532,7 @@ void policy_context::start_backup_partition_unlocked(gpid
pid)
pid, cold_backup_constant::PROGRESS_FINISHED,
dsn::host_port());
return;
}
- partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
+ GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary,
partition_primary);
}
if (!partition_primary) {
LOG_WARNING("{}: partition {} doesn't have a primary now, retry to
backup it later",
diff --git a/src/meta/meta_bulk_load_service.cpp
b/src/meta/meta_bulk_load_service.cpp
index e88f696f9..ba7a2556b 100644
--- a/src/meta/meta_bulk_load_service.cpp
+++ b/src/meta/meta_bulk_load_service.cpp
@@ -475,7 +475,8 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
const auto &primary_addr = request.primary;
- const auto &primary_hp = request.hp_primary;
+ host_port primary_hp;
+ GET_HOST_PORT(request, primary, primary_hp);
if (err != ERR_OK) {
LOG_ERROR("app({}), partition({}) failed to receive bulk load response
from node({}), "
diff --git a/src/meta/server_state_restore.cpp
b/src/meta/server_state_restore.cpp
index 746a49b55..976aa7b5f 100644
--- a/src/meta/server_state_restore.cpp
+++ b/src/meta/server_state_restore.cpp
@@ -251,10 +251,18 @@ void
server_state::on_query_restore_status(configuration_query_restore_rpc rpc)
for (int32_t i = 0; i < app->partition_count; i++) {
const auto &r_state = app->helpers->restore_states[i];
const auto &pc = app->pcs[i];
- if (pc.hp_primary || !pc.hp_secondaries.empty()) {
+ host_port primary;
+ GET_HOST_PORT(pc, primary, primary);
+ if (primary) {
// already have primary, restore succeed
continue;
}
+
+ std::vector<host_port> secondaries;
+ GET_HOST_PORTS(pc, secondaries, secondaries);
+ if (!secondaries.empty()) {
+ continue;
+ }
if (r_state.progress < response.restore_progress[i]) {
response.restore_progress[i] = r_state.progress;
}
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 05d77e859..3ed17b74a 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -936,10 +936,12 @@ void
replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
primary_state.__set_download_progress(_download_progress.load());
primary_state.__set_download_status(_download_status.load());
}
+ host_port primary;
+ GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
group_bulk_load_state,
_replica->_primary_states.pc.primary,
- _replica->_primary_states.pc.hp_primary,
+ primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, download progress = {}%, status = {}",
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc,
primary),
@@ -978,10 +980,12 @@ void
replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
partition_bulk_load_state primary_state;
primary_state.__set_ingest_status(_replica->_app->get_ingestion_status());
+ host_port primary;
+ GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
group_bulk_load_state,
_replica->_primary_states.pc.primary,
- _replica->_primary_states.pc.hp_primary,
+ primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, ingestion status = {}",
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc,
primary),
@@ -1025,10 +1029,12 @@ void
replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
partition_bulk_load_state primary_state;
primary_state.__set_is_cleaned_up(is_cleaned_up());
+ host_port primary;
+ GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
group_bulk_load_state,
_replica->_primary_states.pc.primary,
- _replica->_primary_states.pc.hp_primary,
+ primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}",
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc,
primary),
@@ -1064,10 +1070,12 @@ void
replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
partition_bulk_load_state primary_state;
primary_state.__set_is_paused(_status == bulk_load_status::BLS_PAUSED);
+ host_port primary;
+ GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
group_bulk_load_state,
_replica->_primary_states.pc.primary,
- _replica->_primary_states.pc.hp_primary,
+ primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, bulk_load is_paused = {}",
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc,
primary),
diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp
index c2b4a2fa6..a4016a5f6 100644
--- a/src/replica/replica_context.cpp
+++ b/src/replica/replica_context.cpp
@@ -134,10 +134,18 @@ void
primary_context::get_replica_config(partition_status::type st,
bool primary_context::check_exist(const ::dsn::host_port &node,
partition_status::type st)
{
switch (st) {
- case partition_status::PS_PRIMARY:
- return pc.hp_primary == node;
- case partition_status::PS_SECONDARY:
- return utils::contains(pc.hp_secondaries, node);
+ case partition_status::PS_PRIMARY: {
+ DCHECK(pc.__isset.hp_primary, "");
+ host_port primary;
+ GET_HOST_PORT(pc, primary, primary);
+ return primary == node;
+ }
+ case partition_status::PS_SECONDARY: {
+ DCHECK(pc.__isset.hp_secondaries, "");
+ std::vector<host_port> secondaries;
+ GET_HOST_PORTS(pc, secondaries, secondaries);
+ return utils::contains(secondaries, node);
+ }
case partition_status::PS_POTENTIAL_SECONDARY:
return learners.find(node) != learners.end();
default:
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 0003d1e9e..5d64de07f 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1700,7 +1700,9 @@ void
replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_,
req.__isset.meta_split_status ?
req.meta_split_status
:
split_status::NOT_SPLIT);
} else {
- if (req.config.hp_primary == _primary_host_port) {
+ host_port primary;
+ GET_HOST_PORT(req.config, primary, primary);
+ if (primary == _primary_host_port) {
LOG_INFO("{}@{}: replica not exists on replica server, which is
primary, remove it "
"from meta server",
req.config.pid,
@@ -1751,7 +1753,9 @@ void replica_stub::remove_replica_on_meta_server(const
app_info &info,
SET_IP_AND_HOST_PORT(*request, node, primary_address(),
_primary_host_port);
request->type = config_type::CT_DOWNGRADE_TO_INACTIVE;
- if (_primary_host_port == pc.hp_primary) {
+ host_port primary;
+ GET_HOST_PORT(pc, primary, primary);
+ if (_primary_host_port == primary) {
RESET_IP_AND_HOST_PORT(request->config, primary);
} else if (REMOVE_IP_AND_HOST_PORT(
primary_address(), _primary_host_port, request->config,
secondaries)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]