This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 09e3e44a4 refactor(FQDN): Add some convenient macros and update
idl/bulk_load.thrift related code (#1949)
09e3e44a4 is described below
commit 09e3e44a4a97754416cded3e67bd3846144498a5
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Apr 17 10:19:27 2024 +0800
refactor(FQDN): Add some convenient macros and update idl/bulk_load.thrift
related code (#1949)
- Add a new macros SET_IP_AND_HOST_PORT, SET_VALUE_FROM_IP_AND_HOST_PORT,
SET_VALUE_FROM_HOST_PORT and FMT_HOST_PORT_AND_IP
- Add fill_partition_bulk_load_states() function to simplify code in
src/meta/test/meta_bulk_load_service_test.cpp
---
idl/bulk_load.thrift | 2 +-
src/meta/meta_bulk_load_service.cpp | 70 ++++++++++-----------
src/meta/test/meta_bulk_load_service_test.cpp | 63 +++++++++----------
src/replica/bulk_load/replica_bulk_loader.cpp | 73 ++++++++++------------
.../bulk_load/test/replica_bulk_loader_test.cpp | 3 +-
src/runtime/rpc/rpc_host_port.h | 34 ++++++++++
src/shell/commands/bulk_load.cpp | 3 +
7 files changed, 135 insertions(+), 113 deletions(-)
diff --git a/idl/bulk_load.thrift b/idl/bulk_load.thrift
index cb2e77c4b..92c9cb404 100644
--- a/idl/bulk_load.thrift
+++ b/idl/bulk_load.thrift
@@ -91,7 +91,7 @@ struct bulk_load_request
{
1:dsn.gpid pid;
2:string app_name;
- 3:dsn.rpc_address primary_addr;
+ 3:dsn.rpc_address primary;
4:string remote_provider_name;
5:string cluster_name;
6:i64 ballot;
diff --git a/src/meta/meta_bulk_load_service.cpp
b/src/meta/meta_bulk_load_service.cpp
index 26f51c5a0..8b68993b5 100644
--- a/src/meta/meta_bulk_load_service.cpp
+++ b/src/meta/meta_bulk_load_service.cpp
@@ -24,7 +24,9 @@
#include <algorithm>
#include <chrono>
#include <cstdint>
+#include <type_traits>
+#include "absl/strings/string_view.h"
#include "block_service/block_service.h"
#include "block_service/block_service_manager.h"
#include "common/replica_envs.h"
@@ -51,7 +53,6 @@
#include "utils/fail_point.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
-#include "absl/strings/string_view.h"
DSN_DEFINE_uint32(meta_server,
bulk_load_max_rollback_times,
@@ -430,8 +431,7 @@ void bulk_load_service::partition_bulk_load(const
std::string &app_name, const g
const app_bulk_load_info &ainfo =
_app_bulk_load_info[pid.get_app_id()];
req->pid = pid;
req->app_name = app_name;
- req->primary_addr = pconfig.primary;
- req->__set_hp_primary(pconfig.hp_primary);
+ SET_IP_AND_HOST_PORT(*req, primary, pconfig.primary,
pconfig.hp_primary);
req->remote_provider_name = ainfo.file_provider_type;
req->cluster_name = ainfo.cluster_name;
req->meta_bulk_load_status =
get_partition_bulk_load_status_unlocked(pid);
@@ -440,10 +440,9 @@ void bulk_load_service::partition_bulk_load(const
std::string &app_name, const g
req->remote_root_path = ainfo.remote_root_path;
}
- LOG_INFO("send bulk load request to node({}({})), app({}), partition({}),
partition "
+ LOG_INFO("send bulk load request to node({}), app({}), partition({}),
partition "
"status = {}, remote provider = {}, cluster_name = {},
remote_root_path = {}",
- pconfig.hp_primary,
- pconfig.primary,
+ FMT_HOST_PORT_AND_IP(pconfig, primary),
app_name,
pid,
dsn::enum_to_string(req->meta_bulk_load_status),
@@ -458,9 +457,8 @@ void bulk_load_service::partition_bulk_load(const
std::string &app_name, const g
auto &bulk_load_resp = rpc.response();
if (!bulk_load_resp.__isset.hp_group_bulk_load_state) {
bulk_load_resp.__set_hp_group_bulk_load_state({});
- for (const auto &kv : bulk_load_resp.group_bulk_load_state) {
-
bulk_load_resp.hp_group_bulk_load_state[host_port::from_address(kv.first)] =
- kv.second;
+ for (const auto & [ addr, pbls ] :
bulk_load_resp.group_bulk_load_state) {
+
bulk_load_resp.hp_group_bulk_load_state[host_port::from_address(addr)] = pbls;
}
}
@@ -475,16 +473,15 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
{
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
- const auto &primary_addr = request.primary_addr;
+ const auto &primary_addr = request.primary;
const auto &primary_hp = request.hp_primary;
if (err != ERR_OK) {
- LOG_ERROR("app({}), partition({}) failed to receive bulk load response
from node({}({})), "
+ LOG_ERROR("app({}), partition({}) failed to receive bulk load response
from node({}), "
"error = {}",
app_name,
pid,
- primary_hp,
- primary_addr,
+ FMT_HOST_PORT_AND_IP(request, primary),
err);
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid);
@@ -493,11 +490,10 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
if (response.err == ERR_OBJECT_NOT_FOUND || response.err ==
ERR_INVALID_STATE) {
LOG_ERROR(
- "app({}), partition({}) doesn't exist or has invalid state on
node({}({})), error = {}",
+ "app({}), partition({}) doesn't exist or has invalid state on
node({}), error = {}",
app_name,
pid,
- primary_hp,
- primary_addr,
+ FMT_HOST_PORT_AND_IP(request, primary),
response.err);
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid);
@@ -506,10 +502,9 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
if (response.err == ERR_BUSY) {
LOG_WARNING(
- "node({}({})) has enough replicas downloading, wait for next round
to send bulk load "
+ "node({}) has enough replicas downloading, wait for next round to
send bulk load "
"request for app({}), partition({})",
- primary_hp,
- primary_addr,
+ FMT_HOST_PORT_AND_IP(request, primary),
app_name,
pid);
try_resend_bulk_load_request(app_name, pid);
@@ -517,15 +512,13 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
}
if (response.err != ERR_OK) {
- LOG_ERROR(
- "app({}), partition({}) from node({}({})) handle bulk load
response failed, error = "
- "{}, primary status = {}",
- app_name,
- pid,
- primary_hp,
- primary_addr,
- response.err,
- dsn::enum_to_string(response.primary_bulk_load_status));
+ LOG_ERROR("app({}), partition({}) from node({}) handle bulk load
response failed, error = "
+ "{}, primary status = {}",
+ app_name,
+ pid,
+ FMT_HOST_PORT_AND_IP(request, primary),
+ response.err,
+ dsn::enum_to_string(response.primary_bulk_load_status));
handle_bulk_load_failed(pid.get_app_id(), response.err);
try_resend_bulk_load_request(app_name, pid);
return;
@@ -544,7 +537,7 @@ void
bulk_load_service::on_partition_bulk_load_reply(error_code err,
LOG_WARNING(
"receive out-date response from node({}), app({}), partition({}),
request ballot = "
"{}, current ballot= {}",
- primary_addr,
+ FMT_HOST_PORT_AND_IP(request, primary),
app_name,
pid,
request.ballot,
@@ -1614,19 +1607,20 @@ void
bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
}
}
+ // Fill bulk_load_states and hp_bulk_load_states fileds.
response.bulk_load_states.resize(partition_count);
response.__set_hp_bulk_load_states(
std::vector<std::map<host_port,
partition_bulk_load_state>>(partition_count));
- for (const auto &kv : _partitions_bulk_load_state) {
- if (kv.first.get_app_id() == app_id) {
- auto pidx = kv.first.get_partition_index();
- response.hp_bulk_load_states[pidx] = kv.second;
-
- std::map<rpc_address, partition_bulk_load_state> addr_pbls;
- for (const auto &bls : kv.second) {
-
addr_pbls[dsn::dns_resolver::instance().resolve_address(bls.first)] =
bls.second;
+ for (const auto & [ pid, pbls_by_hps ] : _partitions_bulk_load_state) {
+ if (pid.get_app_id() == app_id) {
+ auto pidx = pid.get_partition_index();
+ response.hp_bulk_load_states[pidx] = pbls_by_hps;
+
+ std::map<rpc_address, partition_bulk_load_state> pbls_by_addrs;
+ for (const auto & [ hp, pbls ] : pbls_by_hps) {
+
pbls_by_addrs[dsn::dns_resolver::instance().resolve_address(hp)] = pbls;
}
- response.bulk_load_states[pidx] = addr_pbls;
+ response.bulk_load_states[pidx] = pbls_by_addrs;
}
}
diff --git a/src/meta/test/meta_bulk_load_service_test.cpp
b/src/meta/test/meta_bulk_load_service_test.cpp
index dedb28d73..2126f74c0 100644
--- a/src/meta/test/meta_bulk_load_service_test.cpp
+++ b/src/meta/test/meta_bulk_load_service_test.cpp
@@ -17,6 +17,7 @@
#include <boost/lexical_cast.hpp>
#include <string.h>
+#include <type_traits>
#include <algorithm>
#include <atomic>
#include <cstdint>
@@ -792,9 +793,8 @@ public:
_req.ballot = BALLOT;
_req.cluster_name = CLUSTER;
_req.pid = gpid(_app_id, _pidx);
- _req.primary_addr = PRIMARY;
+ SET_IP_AND_HOST_PORT(_req, primary, PRIMARY, PRIMARY_HP);
_req.meta_bulk_load_status = status;
- _req.__set_hp_primary(PRIMARY_HP);
}
void create_basic_response(error_code err, bulk_load_status::type status)
@@ -805,6 +805,19 @@ public:
_resp.primary_bulk_load_status = status;
}
+ void fill_partition_bulk_load_states(
+ const std::map<std::pair<::dsn::rpc_address, ::dsn::host_port>,
partition_bulk_load_state>
+ &state_by_hosts)
+ {
+ if (!_resp.__isset.hp_group_bulk_load_state) {
+ _resp.__set_hp_group_bulk_load_state({});
+ }
+ for (const auto & [ addr_and_hp, state ] : state_by_hosts) {
+ _resp.group_bulk_load_state[addr_and_hp.first] = state;
+ _resp.hp_group_bulk_load_state[addr_and_hp.second] = state;
+ }
+ }
+
void mock_response_progress(error_code progress_err, bool finish_download)
{
create_basic_response(ERR_OK, bulk_load_status::BLS_DOWNLOADING);
@@ -817,15 +830,11 @@ public:
state2.__set_download_status(progress_err);
state2.__set_download_progress(secondary2_progress);
- _resp.group_bulk_load_state[PRIMARY] = state;
- _resp.group_bulk_load_state[SECONDARY1] = state;
- _resp.group_bulk_load_state[SECONDARY2] = state2;
- _resp.__set_total_download_progress(total_progress);
+ fill_partition_bulk_load_states({{{PRIMARY, PRIMARY_HP}, state},
+ {{SECONDARY1, SECONDARY1_HP}, state},
+ {{SECONDARY2, SECONDARY2_HP},
state2}});
- _resp.__set_hp_group_bulk_load_state({});
- _resp.hp_group_bulk_load_state[PRIMARY_HP] = state;
- _resp.hp_group_bulk_load_state[SECONDARY1_HP] = state;
- _resp.hp_group_bulk_load_state[SECONDARY2_HP] = state2;
+ _resp.__set_total_download_progress(total_progress);
}
void mock_response_bulk_load_metadata()
@@ -853,15 +862,11 @@ public:
state.__set_ingest_status(ingestion_status::IS_SUCCEED);
state2.__set_ingest_status(secondary_istatus);
- _resp.group_bulk_load_state[PRIMARY] = state;
- _resp.group_bulk_load_state[SECONDARY1] = state;
- _resp.group_bulk_load_state[SECONDARY2] = state2;
- _resp.__set_is_group_ingestion_finished(secondary_istatus ==
ingestion_status::IS_SUCCEED);
+ fill_partition_bulk_load_states({{{PRIMARY, PRIMARY_HP}, state},
+ {{SECONDARY1, SECONDARY1_HP}, state},
+ {{SECONDARY2, SECONDARY2_HP},
state2}});
- _resp.__set_hp_group_bulk_load_state({});
- _resp.hp_group_bulk_load_state[PRIMARY_HP] = state;
- _resp.hp_group_bulk_load_state[SECONDARY1_HP] = state;
- _resp.hp_group_bulk_load_state[SECONDARY2_HP] = state2;
+ _resp.__set_is_group_ingestion_finished(secondary_istatus ==
ingestion_status::IS_SUCCEED);
set_app_ingesting_count(_app_id, ingestion_count);
}
@@ -872,16 +877,13 @@ public:
partition_bulk_load_state state, state2;
state.__set_is_cleaned_up(true);
- _resp.group_bulk_load_state[PRIMARY] = state;
- _resp.group_bulk_load_state[SECONDARY1] = state;
- _resp.__set_hp_group_bulk_load_state({});
- _resp.hp_group_bulk_load_state[PRIMARY_HP] = state;
- _resp.hp_group_bulk_load_state[SECONDARY1_HP] = state;
+ fill_partition_bulk_load_states(
+ {{{PRIMARY, PRIMARY_HP}, state}, {{SECONDARY1, SECONDARY1_HP},
state}});
state2.__set_is_cleaned_up(all_cleaned_up);
- _resp.group_bulk_load_state[SECONDARY2] = state2;
- _resp.hp_group_bulk_load_state[SECONDARY2_HP] = state2;
+ fill_partition_bulk_load_states({{{SECONDARY2, SECONDARY2_HP},
state2}});
+
_resp.__set_is_group_bulk_load_context_cleaned_up(all_cleaned_up);
}
@@ -893,14 +895,9 @@ public:
state.__set_is_paused(true);
state2.__set_is_paused(is_group_paused);
- _resp.group_bulk_load_state[PRIMARY] = state;
- _resp.group_bulk_load_state[SECONDARY1] = state;
- _resp.group_bulk_load_state[SECONDARY2] = state2;
-
- _resp.__set_hp_group_bulk_load_state({});
- _resp.hp_group_bulk_load_state[PRIMARY_HP] = state;
- _resp.hp_group_bulk_load_state[SECONDARY1_HP] = state;
- _resp.hp_group_bulk_load_state[SECONDARY2_HP] = state2;
+ fill_partition_bulk_load_states({{{PRIMARY, PRIMARY_HP}, state},
+ {{SECONDARY1, SECONDARY1_HP}, state},
+ {{SECONDARY2, SECONDARY2_HP},
state2}});
_resp.__set_is_group_bulk_load_paused(is_group_paused);
}
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 9ae6f0598..fb583ccb2 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -195,8 +195,7 @@ void replica_bulk_loader::broadcast_group_bulk_load(const
bulk_load_request &met
auto request = std::make_unique<group_bulk_load_request>();
request->app_name = _replica->_app_info.app_name;
const auto &addr = dsn::dns_resolver::instance().resolve_address(hp);
- request->target = addr;
- request->__set_hp_target(hp);
+ SET_IP_AND_HOST_PORT(*request, target, addr, hp);
_replica->_primary_states.get_replica_config(partition_status::PS_SECONDARY,
request->config);
request->cluster_name = meta_req.cluster_name;
@@ -286,31 +285,27 @@ void
replica_bulk_loader::on_group_bulk_load_reply(error_code err,
_replica->_primary_states.group_bulk_load_pending_replies.erase(req.hp_target);
if (err != ERR_OK) {
- LOG_ERROR_PREFIX("failed to receive group_bulk_load_reply from {}({}),
error = {}",
- req.hp_target,
- req.target,
+ LOG_ERROR_PREFIX("failed to receive group_bulk_load_reply from {},
error = {}",
+ FMT_HOST_PORT_AND_IP(req, target),
err);
_replica->_primary_states.reset_node_bulk_load_states(req.hp_target);
return;
}
if (resp.err != ERR_OK) {
- LOG_ERROR_PREFIX("receive group_bulk_load response from {}({}) failed,
error = {}",
- req.hp_target,
- req.target,
+ LOG_ERROR_PREFIX("receive group_bulk_load response from {} failed,
error = {}",
+ FMT_HOST_PORT_AND_IP(req, target),
resp.err);
_replica->_primary_states.reset_node_bulk_load_states(req.hp_target);
return;
}
if (req.config.ballot != get_ballot()) {
- LOG_ERROR_PREFIX(
- "recevied wrong group_bulk_load response from {}({}), request
ballot = {}, "
- "current ballot = {}",
- req.hp_target,
- req.target,
- req.config.ballot,
- get_ballot());
+ LOG_ERROR_PREFIX("recevied wrong group_bulk_load response from {},
request ballot = {}, "
+ "current ballot = {}",
+ FMT_HOST_PORT_AND_IP(req, target),
+ req.config.ballot,
+ get_ballot());
_replica->_primary_states.reset_node_bulk_load_states(req.hp_target);
return;
}
@@ -933,9 +928,11 @@ void
replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
primary_state.__set_download_progress(_download_progress.load());
primary_state.__set_download_status(_download_status.load());
}
-
response.group_bulk_load_state[_replica->_primary_states.membership.primary] =
primary_state;
-
response.hp_group_bulk_load_state[_replica->_primary_states.membership.hp_primary]
=
- primary_state;
+ SET_VALUE_FROM_IP_AND_HOST_PORT(response,
+ group_bulk_load_state,
+
_replica->_primary_states.membership.primary,
+
_replica->_primary_states.membership.hp_primary,
+ primary_state);
LOG_INFO_PREFIX("primary = {}({}), download progress = {}%, status = {}",
_replica->_primary_states.membership.hp_primary,
_replica->_primary_states.membership.primary,
@@ -952,9 +949,7 @@ void
replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
secondary_state.__isset.download_status ?
secondary_state.download_status : ERR_OK;
LOG_INFO_PREFIX(
"secondary = {}, download progress = {}%, status={}", target_hp,
s_progress, s_status);
-
response.group_bulk_load_state[dsn::dns_resolver::instance().resolve_address(target_hp)]
=
- secondary_state;
- response.hp_group_bulk_load_state[target_hp] = secondary_state;
+ SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp,
secondary_state);
total_progress += s_progress;
}
@@ -976,9 +971,11 @@ void
replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
partition_bulk_load_state primary_state;
primary_state.__set_ingest_status(_replica->_app->get_ingestion_status());
-
response.group_bulk_load_state[_replica->_primary_states.membership.primary] =
primary_state;
-
response.hp_group_bulk_load_state[_replica->_primary_states.membership.hp_primary]
=
- primary_state;
+ SET_VALUE_FROM_IP_AND_HOST_PORT(response,
+ group_bulk_load_state,
+
_replica->_primary_states.membership.primary,
+
_replica->_primary_states.membership.hp_primary,
+ primary_state);
LOG_INFO_PREFIX("primary = {}({}), ingestion status = {}",
_replica->_primary_states.membership.hp_primary,
_replica->_primary_states.membership.primary,
@@ -996,9 +993,7 @@ void
replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
:
ingestion_status::IS_INVALID;
LOG_INFO_PREFIX(
"secondary = {}, ingestion status={}", target_hp,
enum_to_string(ingest_status));
-
response.group_bulk_load_state[dsn::dns_resolver::instance().resolve_address(target_hp)]
=
- secondary_state;
- response.hp_group_bulk_load_state[target_hp] = secondary_state;
+ SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp,
secondary_state);
is_group_ingestion_finish &= (ingest_status ==
ingestion_status::IS_SUCCEED);
}
response.__set_is_group_ingestion_finished(is_group_ingestion_finish);
@@ -1024,9 +1019,11 @@ void
replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
partition_bulk_load_state primary_state;
primary_state.__set_is_cleaned_up(is_cleaned_up());
-
response.group_bulk_load_state[_replica->_primary_states.membership.primary] =
primary_state;
-
response.hp_group_bulk_load_state[_replica->_primary_states.membership.hp_primary]
=
- primary_state;
+ SET_VALUE_FROM_IP_AND_HOST_PORT(response,
+ group_bulk_load_state,
+
_replica->_primary_states.membership.primary,
+
_replica->_primary_states.membership.hp_primary,
+ primary_state);
LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}",
_replica->_primary_states.membership.primary,
primary_state.is_cleaned_up);
@@ -1041,9 +1038,7 @@ void
replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
secondary_state.__isset.is_cleaned_up ?
secondary_state.is_cleaned_up : false;
LOG_INFO_PREFIX(
"secondary = {}, bulk load states cleaned_up = {}", target_hp,
is_cleaned_up);
-
response.group_bulk_load_state[dsn::dns_resolver::instance().resolve_address(target_hp)]
=
- secondary_state;
- response.hp_group_bulk_load_state[target_hp] = secondary_state;
+ SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp,
secondary_state);
group_flag &= is_cleaned_up;
}
LOG_INFO_PREFIX("group bulk load states cleaned_up = {}", group_flag);
@@ -1063,9 +1058,11 @@ void
replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
partition_bulk_load_state primary_state;
primary_state.__set_is_paused(_status == bulk_load_status::BLS_PAUSED);
-
response.group_bulk_load_state[_replica->_primary_states.membership.primary] =
primary_state;
-
response.hp_group_bulk_load_state[_replica->_primary_states.membership.hp_primary]
=
- primary_state;
+ SET_VALUE_FROM_IP_AND_HOST_PORT(response,
+ group_bulk_load_state,
+
_replica->_primary_states.membership.primary,
+
_replica->_primary_states.membership.hp_primary,
+ primary_state);
LOG_INFO_PREFIX("primary = {}({}), bulk_load is_paused = {}",
_replica->_primary_states.membership.hp_primary,
_replica->_primary_states.membership.primary,
@@ -1079,9 +1076,7 @@ void
replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
_replica->_primary_states.secondary_bulk_load_states[target_hp];
bool is_paused = secondary_state.__isset.is_paused ?
secondary_state.is_paused : false;
LOG_INFO_PREFIX("secondary = {}, bulk_load is_paused = {}", target_hp,
is_paused);
-
response.group_bulk_load_state[dsn::dns_resolver::instance().resolve_address(target_hp)]
=
- secondary_state;
- response.hp_group_bulk_load_state[target_hp] = secondary_state;
+ SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp,
secondary_state);
group_is_paused &= is_paused;
}
LOG_INFO_PREFIX("group bulk load is_paused = {}", group_is_paused);
diff --git a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
index 1375549bf..95dbad2cb 100644
--- a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
+++ b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
@@ -220,8 +220,7 @@ public:
_group_req.meta_bulk_load_status = status;
_group_req.config.status = partition_status::PS_SECONDARY;
_group_req.config.ballot = b;
- _group_req.target = SECONDARY;
- _group_req.__set_hp_target(SECONDARY_HP);
+ SET_IP_AND_HOST_PORT(_group_req, target, PRIMARY, PRIMARY_HP);
}
void mock_replica_config(partition_status::type status)
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index cc1cd70e0..00ca8dd1c 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -44,6 +44,8 @@ class TProtocol;
} // namespace thrift
} // namespace apache
+// Get host_port from 'obj', the result is filled in 'target', the source is
from host_port type
+// field 'hp_<field>' if it is set, otherwise, reverse resolve from the
rpc_address '<field>'.
#define GET_HOST_PORT(obj, field, target)
\
do {
\
const auto &_obj = (obj);
\
@@ -54,6 +56,38 @@ class TProtocol;
}
\
} while (0)
+// Set 'addr' and 'hp' to the '<field>' and optional 'hp_<field>' of 'obj'.
The type of the
+// fields are rpc_address and host_port, respectively.
+#define SET_IP_AND_HOST_PORT(obj, field, addr, hp)
\
+ do {
\
+ auto &_obj = (obj);
\
+ _obj.field = addr;
\
+ _obj.__set_hp_##field(hp);
\
+ } while (0)
+
+// Set 'value' to the '<field>' map and optional 'hp_<field>' map of 'obj'.
The key of the
+// maps are rpc_address and host_port type and indexed by 'addr' and 'hp',
respectively.
+#define SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, hp, value)
\
+ do {
\
+ auto &_obj = (obj);
\
+ _obj.field[addr] = value;
\
+ if (!_obj.__isset.hp_##field) {
\
+ _obj.__set_hp_##field({});
\
+ }
\
+ _obj.hp_##field[hp] = value;
\
+ } while (0)
+
+// Set 'value' to the '<field>' map and optional 'hp_<field>' map of 'obj'.
The key of the
+// maps are rpc_address and host_port type and indexed by 'addr' and reverse
resolve result of
+// 'addr', respectively.
+#define SET_VALUE_FROM_HOST_PORT(obj, field, hp, value)
\
+ do {
\
+ const auto addr = dsn::dns_resolver::instance().resolve_address(hp);
\
+ SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, hp, value);
\
+ } while (0)
+
+#define FMT_HOST_PORT_AND_IP(obj, field) fmt::format("{}({})",
(obj).hp_##field, (obj).field)
+
namespace dsn {
class rpc_group_host_port;
diff --git a/src/shell/commands/bulk_load.cpp b/src/shell/commands/bulk_load.cpp
index 17153dcfb..e5b0265e3 100644
--- a/src/shell/commands/bulk_load.cpp
+++ b/src/shell/commands/bulk_load.cpp
@@ -331,6 +331,7 @@ bool query_bulk_load_status(command_executor *e,
shell_context *sc, arguments ar
if (print_download_progress) {
for (auto i = 0; i < partition_count; ++i) {
auto progress = 0;
+ // The 'bulk_load_states' must be set whatever the version of the
server is.
for (const auto &kv : resp.bulk_load_states[i]) {
progress += kv.second.download_progress;
}
@@ -357,6 +358,7 @@ bool query_bulk_load_status(command_executor *e,
shell_context *sc, arguments ar
}
for (auto i = 0; i < partition_count; ++i) {
+ // The 'bulk_load_states' must be set whatever the version of the
server is.
auto states = resp.bulk_load_states[i];
tp_all.add_row(i);
tp_all.append_data(get_short_status(resp.partitions_status[i]));
@@ -408,6 +410,7 @@ bool query_bulk_load_status(command_executor *e,
shell_context *sc, arguments ar
tp_single.add_column("is_paused");
}
+ // The 'bulk_load_states' must be set whatever the version of the
server is.
auto states = resp.bulk_load_states[pidx];
for (auto iter = states.begin(); iter != states.end(); ++iter) {
tp_single.add_row(pidx);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]