This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 09e3e44a4 refactor(FQDN): Add some convenient macros and update 
idl/bulk_load.thrift related code (#1949)
09e3e44a4 is described below

commit 09e3e44a4a97754416cded3e67bd3846144498a5
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Apr 17 10:19:27 2024 +0800

    refactor(FQDN): Add some convenient macros and update idl/bulk_load.thrift 
related code (#1949)
    
    - Add a new macros SET_IP_AND_HOST_PORT, SET_VALUE_FROM_IP_AND_HOST_PORT,
      SET_VALUE_FROM_HOST_PORT and FMT_HOST_PORT_AND_IP
    - Add fill_partition_bulk_load_states() function to simplify code in
      src/meta/test/meta_bulk_load_service_test.cpp
---
 idl/bulk_load.thrift                               |  2 +-
 src/meta/meta_bulk_load_service.cpp                | 70 ++++++++++-----------
 src/meta/test/meta_bulk_load_service_test.cpp      | 63 +++++++++----------
 src/replica/bulk_load/replica_bulk_loader.cpp      | 73 ++++++++++------------
 .../bulk_load/test/replica_bulk_loader_test.cpp    |  3 +-
 src/runtime/rpc/rpc_host_port.h                    | 34 ++++++++++
 src/shell/commands/bulk_load.cpp                   |  3 +
 7 files changed, 135 insertions(+), 113 deletions(-)

diff --git a/idl/bulk_load.thrift b/idl/bulk_load.thrift
index cb2e77c4b..92c9cb404 100644
--- a/idl/bulk_load.thrift
+++ b/idl/bulk_load.thrift
@@ -91,7 +91,7 @@ struct bulk_load_request
 {
     1:dsn.gpid                pid;
     2:string                  app_name;
-    3:dsn.rpc_address         primary_addr;
+    3:dsn.rpc_address         primary;
     4:string                  remote_provider_name;
     5:string                  cluster_name;
     6:i64                     ballot;
diff --git a/src/meta/meta_bulk_load_service.cpp 
b/src/meta/meta_bulk_load_service.cpp
index 26f51c5a0..8b68993b5 100644
--- a/src/meta/meta_bulk_load_service.cpp
+++ b/src/meta/meta_bulk_load_service.cpp
@@ -24,7 +24,9 @@
 #include <algorithm>
 #include <chrono>
 #include <cstdint>
+#include <type_traits>
 
+#include "absl/strings/string_view.h"
 #include "block_service/block_service.h"
 #include "block_service/block_service_manager.h"
 #include "common/replica_envs.h"
@@ -51,7 +53,6 @@
 #include "utils/fail_point.h"
 #include "utils/fmt_logging.h"
 #include "utils/string_conv.h"
-#include "absl/strings/string_view.h"
 
 DSN_DEFINE_uint32(meta_server,
                   bulk_load_max_rollback_times,
@@ -430,8 +431,7 @@ void bulk_load_service::partition_bulk_load(const 
std::string &app_name, const g
         const app_bulk_load_info &ainfo = 
_app_bulk_load_info[pid.get_app_id()];
         req->pid = pid;
         req->app_name = app_name;
-        req->primary_addr = pconfig.primary;
-        req->__set_hp_primary(pconfig.hp_primary);
+        SET_IP_AND_HOST_PORT(*req, primary, pconfig.primary, 
pconfig.hp_primary);
         req->remote_provider_name = ainfo.file_provider_type;
         req->cluster_name = ainfo.cluster_name;
         req->meta_bulk_load_status = 
get_partition_bulk_load_status_unlocked(pid);
@@ -440,10 +440,9 @@ void bulk_load_service::partition_bulk_load(const 
std::string &app_name, const g
         req->remote_root_path = ainfo.remote_root_path;
     }
 
-    LOG_INFO("send bulk load request to node({}({})), app({}), partition({}), 
partition "
+    LOG_INFO("send bulk load request to node({}), app({}), partition({}), 
partition "
              "status = {}, remote provider = {}, cluster_name = {}, 
remote_root_path = {}",
-             pconfig.hp_primary,
-             pconfig.primary,
+             FMT_HOST_PORT_AND_IP(pconfig, primary),
              app_name,
              pid,
              dsn::enum_to_string(req->meta_bulk_load_status),
@@ -458,9 +457,8 @@ void bulk_load_service::partition_bulk_load(const 
std::string &app_name, const g
         auto &bulk_load_resp = rpc.response();
         if (!bulk_load_resp.__isset.hp_group_bulk_load_state) {
             bulk_load_resp.__set_hp_group_bulk_load_state({});
-            for (const auto &kv : bulk_load_resp.group_bulk_load_state) {
-                
bulk_load_resp.hp_group_bulk_load_state[host_port::from_address(kv.first)] =
-                    kv.second;
+            for (const auto & [ addr, pbls ] : 
bulk_load_resp.group_bulk_load_state) {
+                
bulk_load_resp.hp_group_bulk_load_state[host_port::from_address(addr)] = pbls;
             }
         }
 
@@ -475,16 +473,15 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
 {
     const std::string &app_name = request.app_name;
     const gpid &pid = request.pid;
-    const auto &primary_addr = request.primary_addr;
+    const auto &primary_addr = request.primary;
     const auto &primary_hp = request.hp_primary;
 
     if (err != ERR_OK) {
-        LOG_ERROR("app({}), partition({}) failed to receive bulk load response 
from node({}({})), "
+        LOG_ERROR("app({}), partition({}) failed to receive bulk load response 
from node({}), "
                   "error = {}",
                   app_name,
                   pid,
-                  primary_hp,
-                  primary_addr,
+                  FMT_HOST_PORT_AND_IP(request, primary),
                   err);
         try_rollback_to_downloading(app_name, pid);
         try_resend_bulk_load_request(app_name, pid);
@@ -493,11 +490,10 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
 
     if (response.err == ERR_OBJECT_NOT_FOUND || response.err == 
ERR_INVALID_STATE) {
         LOG_ERROR(
-            "app({}), partition({}) doesn't exist or has invalid state on 
node({}({})), error = {}",
+            "app({}), partition({}) doesn't exist or has invalid state on 
node({}), error = {}",
             app_name,
             pid,
-            primary_hp,
-            primary_addr,
+            FMT_HOST_PORT_AND_IP(request, primary),
             response.err);
         try_rollback_to_downloading(app_name, pid);
         try_resend_bulk_load_request(app_name, pid);
@@ -506,10 +502,9 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
 
     if (response.err == ERR_BUSY) {
         LOG_WARNING(
-            "node({}({})) has enough replicas downloading, wait for next round 
to send bulk load "
+            "node({}) has enough replicas downloading, wait for next round to 
send bulk load "
             "request for app({}), partition({})",
-            primary_hp,
-            primary_addr,
+            FMT_HOST_PORT_AND_IP(request, primary),
             app_name,
             pid);
         try_resend_bulk_load_request(app_name, pid);
@@ -517,15 +512,13 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
     }
 
     if (response.err != ERR_OK) {
-        LOG_ERROR(
-            "app({}), partition({}) from node({}({})) handle bulk load 
response failed, error = "
-            "{}, primary status = {}",
-            app_name,
-            pid,
-            primary_hp,
-            primary_addr,
-            response.err,
-            dsn::enum_to_string(response.primary_bulk_load_status));
+        LOG_ERROR("app({}), partition({}) from node({}) handle bulk load 
response failed, error = "
+                  "{}, primary status = {}",
+                  app_name,
+                  pid,
+                  FMT_HOST_PORT_AND_IP(request, primary),
+                  response.err,
+                  dsn::enum_to_string(response.primary_bulk_load_status));
         handle_bulk_load_failed(pid.get_app_id(), response.err);
         try_resend_bulk_load_request(app_name, pid);
         return;
@@ -544,7 +537,7 @@ void 
bulk_load_service::on_partition_bulk_load_reply(error_code err,
         LOG_WARNING(
             "receive out-date response from node({}), app({}), partition({}), 
request ballot = "
             "{}, current ballot= {}",
-            primary_addr,
+            FMT_HOST_PORT_AND_IP(request, primary),
             app_name,
             pid,
             request.ballot,
@@ -1614,19 +1607,20 @@ void 
bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
         }
     }
 
+    // Fill bulk_load_states and hp_bulk_load_states fileds.
     response.bulk_load_states.resize(partition_count);
     response.__set_hp_bulk_load_states(
         std::vector<std::map<host_port, 
partition_bulk_load_state>>(partition_count));
-    for (const auto &kv : _partitions_bulk_load_state) {
-        if (kv.first.get_app_id() == app_id) {
-            auto pidx = kv.first.get_partition_index();
-            response.hp_bulk_load_states[pidx] = kv.second;
-
-            std::map<rpc_address, partition_bulk_load_state> addr_pbls;
-            for (const auto &bls : kv.second) {
-                
addr_pbls[dsn::dns_resolver::instance().resolve_address(bls.first)] = 
bls.second;
+    for (const auto & [ pid, pbls_by_hps ] : _partitions_bulk_load_state) {
+        if (pid.get_app_id() == app_id) {
+            auto pidx = pid.get_partition_index();
+            response.hp_bulk_load_states[pidx] = pbls_by_hps;
+
+            std::map<rpc_address, partition_bulk_load_state> pbls_by_addrs;
+            for (const auto & [ hp, pbls ] : pbls_by_hps) {
+                
pbls_by_addrs[dsn::dns_resolver::instance().resolve_address(hp)] = pbls;
             }
-            response.bulk_load_states[pidx] = addr_pbls;
+            response.bulk_load_states[pidx] = pbls_by_addrs;
         }
     }
 
diff --git a/src/meta/test/meta_bulk_load_service_test.cpp 
b/src/meta/test/meta_bulk_load_service_test.cpp
index dedb28d73..2126f74c0 100644
--- a/src/meta/test/meta_bulk_load_service_test.cpp
+++ b/src/meta/test/meta_bulk_load_service_test.cpp
@@ -17,6 +17,7 @@
 
 #include <boost/lexical_cast.hpp>
 #include <string.h>
+#include <type_traits>
 #include <algorithm>
 #include <atomic>
 #include <cstdint>
@@ -792,9 +793,8 @@ public:
         _req.ballot = BALLOT;
         _req.cluster_name = CLUSTER;
         _req.pid = gpid(_app_id, _pidx);
-        _req.primary_addr = PRIMARY;
+        SET_IP_AND_HOST_PORT(_req, primary, PRIMARY, PRIMARY_HP);
         _req.meta_bulk_load_status = status;
-        _req.__set_hp_primary(PRIMARY_HP);
     }
 
     void create_basic_response(error_code err, bulk_load_status::type status)
@@ -805,6 +805,19 @@ public:
         _resp.primary_bulk_load_status = status;
     }
 
+    void fill_partition_bulk_load_states(
+        const std::map<std::pair<::dsn::rpc_address, ::dsn::host_port>, 
partition_bulk_load_state>
+            &state_by_hosts)
+    {
+        if (!_resp.__isset.hp_group_bulk_load_state) {
+            _resp.__set_hp_group_bulk_load_state({});
+        }
+        for (const auto & [ addr_and_hp, state ] : state_by_hosts) {
+            _resp.group_bulk_load_state[addr_and_hp.first] = state;
+            _resp.hp_group_bulk_load_state[addr_and_hp.second] = state;
+        }
+    }
+
     void mock_response_progress(error_code progress_err, bool finish_download)
     {
         create_basic_response(ERR_OK, bulk_load_status::BLS_DOWNLOADING);
@@ -817,15 +830,11 @@ public:
         state2.__set_download_status(progress_err);
         state2.__set_download_progress(secondary2_progress);
 
-        _resp.group_bulk_load_state[PRIMARY] = state;
-        _resp.group_bulk_load_state[SECONDARY1] = state;
-        _resp.group_bulk_load_state[SECONDARY2] = state2;
-        _resp.__set_total_download_progress(total_progress);
+        fill_partition_bulk_load_states({{{PRIMARY, PRIMARY_HP}, state},
+                                         {{SECONDARY1, SECONDARY1_HP}, state},
+                                         {{SECONDARY2, SECONDARY2_HP}, 
state2}});
 
-        _resp.__set_hp_group_bulk_load_state({});
-        _resp.hp_group_bulk_load_state[PRIMARY_HP] = state;
-        _resp.hp_group_bulk_load_state[SECONDARY1_HP] = state;
-        _resp.hp_group_bulk_load_state[SECONDARY2_HP] = state2;
+        _resp.__set_total_download_progress(total_progress);
     }
 
     void mock_response_bulk_load_metadata()
@@ -853,15 +862,11 @@ public:
         state.__set_ingest_status(ingestion_status::IS_SUCCEED);
         state2.__set_ingest_status(secondary_istatus);
 
-        _resp.group_bulk_load_state[PRIMARY] = state;
-        _resp.group_bulk_load_state[SECONDARY1] = state;
-        _resp.group_bulk_load_state[SECONDARY2] = state2;
-        _resp.__set_is_group_ingestion_finished(secondary_istatus == 
ingestion_status::IS_SUCCEED);
+        fill_partition_bulk_load_states({{{PRIMARY, PRIMARY_HP}, state},
+                                         {{SECONDARY1, SECONDARY1_HP}, state},
+                                         {{SECONDARY2, SECONDARY2_HP}, 
state2}});
 
-        _resp.__set_hp_group_bulk_load_state({});
-        _resp.hp_group_bulk_load_state[PRIMARY_HP] = state;
-        _resp.hp_group_bulk_load_state[SECONDARY1_HP] = state;
-        _resp.hp_group_bulk_load_state[SECONDARY2_HP] = state2;
+        _resp.__set_is_group_ingestion_finished(secondary_istatus == 
ingestion_status::IS_SUCCEED);
 
         set_app_ingesting_count(_app_id, ingestion_count);
     }
@@ -872,16 +877,13 @@ public:
 
         partition_bulk_load_state state, state2;
         state.__set_is_cleaned_up(true);
-        _resp.group_bulk_load_state[PRIMARY] = state;
-        _resp.group_bulk_load_state[SECONDARY1] = state;
 
-        _resp.__set_hp_group_bulk_load_state({});
-        _resp.hp_group_bulk_load_state[PRIMARY_HP] = state;
-        _resp.hp_group_bulk_load_state[SECONDARY1_HP] = state;
+        fill_partition_bulk_load_states(
+            {{{PRIMARY, PRIMARY_HP}, state}, {{SECONDARY1, SECONDARY1_HP}, 
state}});
 
         state2.__set_is_cleaned_up(all_cleaned_up);
-        _resp.group_bulk_load_state[SECONDARY2] = state2;
-        _resp.hp_group_bulk_load_state[SECONDARY2_HP] = state2;
+        fill_partition_bulk_load_states({{{SECONDARY2, SECONDARY2_HP}, 
state2}});
+
         _resp.__set_is_group_bulk_load_context_cleaned_up(all_cleaned_up);
     }
 
@@ -893,14 +895,9 @@ public:
         state.__set_is_paused(true);
         state2.__set_is_paused(is_group_paused);
 
-        _resp.group_bulk_load_state[PRIMARY] = state;
-        _resp.group_bulk_load_state[SECONDARY1] = state;
-        _resp.group_bulk_load_state[SECONDARY2] = state2;
-
-        _resp.__set_hp_group_bulk_load_state({});
-        _resp.hp_group_bulk_load_state[PRIMARY_HP] = state;
-        _resp.hp_group_bulk_load_state[SECONDARY1_HP] = state;
-        _resp.hp_group_bulk_load_state[SECONDARY2_HP] = state2;
+        fill_partition_bulk_load_states({{{PRIMARY, PRIMARY_HP}, state},
+                                         {{SECONDARY1, SECONDARY1_HP}, state},
+                                         {{SECONDARY2, SECONDARY2_HP}, 
state2}});
 
         _resp.__set_is_group_bulk_load_paused(is_group_paused);
     }
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp 
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 9ae6f0598..fb583ccb2 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -195,8 +195,7 @@ void replica_bulk_loader::broadcast_group_bulk_load(const 
bulk_load_request &met
         auto request = std::make_unique<group_bulk_load_request>();
         request->app_name = _replica->_app_info.app_name;
         const auto &addr = dsn::dns_resolver::instance().resolve_address(hp);
-        request->target = addr;
-        request->__set_hp_target(hp);
+        SET_IP_AND_HOST_PORT(*request, target, addr, hp);
         
_replica->_primary_states.get_replica_config(partition_status::PS_SECONDARY,
                                                      request->config);
         request->cluster_name = meta_req.cluster_name;
@@ -286,31 +285,27 @@ void 
replica_bulk_loader::on_group_bulk_load_reply(error_code err,
     
_replica->_primary_states.group_bulk_load_pending_replies.erase(req.hp_target);
 
     if (err != ERR_OK) {
-        LOG_ERROR_PREFIX("failed to receive group_bulk_load_reply from {}({}), 
error = {}",
-                         req.hp_target,
-                         req.target,
+        LOG_ERROR_PREFIX("failed to receive group_bulk_load_reply from {}, 
error = {}",
+                         FMT_HOST_PORT_AND_IP(req, target),
                          err);
         _replica->_primary_states.reset_node_bulk_load_states(req.hp_target);
         return;
     }
 
     if (resp.err != ERR_OK) {
-        LOG_ERROR_PREFIX("receive group_bulk_load response from {}({}) failed, 
error = {}",
-                         req.hp_target,
-                         req.target,
+        LOG_ERROR_PREFIX("receive group_bulk_load response from {} failed, 
error = {}",
+                         FMT_HOST_PORT_AND_IP(req, target),
                          resp.err);
         _replica->_primary_states.reset_node_bulk_load_states(req.hp_target);
         return;
     }
 
     if (req.config.ballot != get_ballot()) {
-        LOG_ERROR_PREFIX(
-            "recevied wrong group_bulk_load response from {}({}), request 
ballot = {}, "
-            "current ballot = {}",
-            req.hp_target,
-            req.target,
-            req.config.ballot,
-            get_ballot());
+        LOG_ERROR_PREFIX("recevied wrong group_bulk_load response from {}, 
request ballot = {}, "
+                         "current ballot = {}",
+                         FMT_HOST_PORT_AND_IP(req, target),
+                         req.config.ballot,
+                         get_ballot());
         _replica->_primary_states.reset_node_bulk_load_states(req.hp_target);
         return;
     }
@@ -933,9 +928,11 @@ void 
replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
         primary_state.__set_download_progress(_download_progress.load());
         primary_state.__set_download_status(_download_status.load());
     }
-    
response.group_bulk_load_state[_replica->_primary_states.membership.primary] = 
primary_state;
-    
response.hp_group_bulk_load_state[_replica->_primary_states.membership.hp_primary]
 =
-        primary_state;
+    SET_VALUE_FROM_IP_AND_HOST_PORT(response,
+                                    group_bulk_load_state,
+                                    
_replica->_primary_states.membership.primary,
+                                    
_replica->_primary_states.membership.hp_primary,
+                                    primary_state);
     LOG_INFO_PREFIX("primary = {}({}), download progress = {}%, status = {}",
                     _replica->_primary_states.membership.hp_primary,
                     _replica->_primary_states.membership.primary,
@@ -952,9 +949,7 @@ void 
replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
             secondary_state.__isset.download_status ? 
secondary_state.download_status : ERR_OK;
         LOG_INFO_PREFIX(
             "secondary = {}, download progress = {}%, status={}", target_hp, 
s_progress, s_status);
-        
response.group_bulk_load_state[dsn::dns_resolver::instance().resolve_address(target_hp)]
 =
-            secondary_state;
-        response.hp_group_bulk_load_state[target_hp] = secondary_state;
+        SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, 
secondary_state);
         total_progress += s_progress;
     }
 
@@ -976,9 +971,11 @@ void 
replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
 
     partition_bulk_load_state primary_state;
     primary_state.__set_ingest_status(_replica->_app->get_ingestion_status());
-    
response.group_bulk_load_state[_replica->_primary_states.membership.primary] = 
primary_state;
-    
response.hp_group_bulk_load_state[_replica->_primary_states.membership.hp_primary]
 =
-        primary_state;
+    SET_VALUE_FROM_IP_AND_HOST_PORT(response,
+                                    group_bulk_load_state,
+                                    
_replica->_primary_states.membership.primary,
+                                    
_replica->_primary_states.membership.hp_primary,
+                                    primary_state);
     LOG_INFO_PREFIX("primary = {}({}), ingestion status = {}",
                     _replica->_primary_states.membership.hp_primary,
                     _replica->_primary_states.membership.primary,
@@ -996,9 +993,7 @@ void 
replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
                                                    : 
ingestion_status::IS_INVALID;
         LOG_INFO_PREFIX(
             "secondary = {}, ingestion status={}", target_hp, 
enum_to_string(ingest_status));
-        
response.group_bulk_load_state[dsn::dns_resolver::instance().resolve_address(target_hp)]
 =
-            secondary_state;
-        response.hp_group_bulk_load_state[target_hp] = secondary_state;
+        SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, 
secondary_state);
         is_group_ingestion_finish &= (ingest_status == 
ingestion_status::IS_SUCCEED);
     }
     response.__set_is_group_ingestion_finished(is_group_ingestion_finish);
@@ -1024,9 +1019,11 @@ void 
replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
 
     partition_bulk_load_state primary_state;
     primary_state.__set_is_cleaned_up(is_cleaned_up());
-    
response.group_bulk_load_state[_replica->_primary_states.membership.primary] = 
primary_state;
-    
response.hp_group_bulk_load_state[_replica->_primary_states.membership.hp_primary]
 =
-        primary_state;
+    SET_VALUE_FROM_IP_AND_HOST_PORT(response,
+                                    group_bulk_load_state,
+                                    
_replica->_primary_states.membership.primary,
+                                    
_replica->_primary_states.membership.hp_primary,
+                                    primary_state);
     LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}",
                     _replica->_primary_states.membership.primary,
                     primary_state.is_cleaned_up);
@@ -1041,9 +1038,7 @@ void 
replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
             secondary_state.__isset.is_cleaned_up ? 
secondary_state.is_cleaned_up : false;
         LOG_INFO_PREFIX(
             "secondary = {}, bulk load states cleaned_up = {}", target_hp, 
is_cleaned_up);
-        
response.group_bulk_load_state[dsn::dns_resolver::instance().resolve_address(target_hp)]
 =
-            secondary_state;
-        response.hp_group_bulk_load_state[target_hp] = secondary_state;
+        SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, 
secondary_state);
         group_flag &= is_cleaned_up;
     }
     LOG_INFO_PREFIX("group bulk load states cleaned_up = {}", group_flag);
@@ -1063,9 +1058,11 @@ void 
replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
 
     partition_bulk_load_state primary_state;
     primary_state.__set_is_paused(_status == bulk_load_status::BLS_PAUSED);
-    
response.group_bulk_load_state[_replica->_primary_states.membership.primary] = 
primary_state;
-    
response.hp_group_bulk_load_state[_replica->_primary_states.membership.hp_primary]
 =
-        primary_state;
+    SET_VALUE_FROM_IP_AND_HOST_PORT(response,
+                                    group_bulk_load_state,
+                                    
_replica->_primary_states.membership.primary,
+                                    
_replica->_primary_states.membership.hp_primary,
+                                    primary_state);
     LOG_INFO_PREFIX("primary = {}({}), bulk_load is_paused = {}",
                     _replica->_primary_states.membership.hp_primary,
                     _replica->_primary_states.membership.primary,
@@ -1079,9 +1076,7 @@ void 
replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
             _replica->_primary_states.secondary_bulk_load_states[target_hp];
         bool is_paused = secondary_state.__isset.is_paused ? 
secondary_state.is_paused : false;
         LOG_INFO_PREFIX("secondary = {}, bulk_load is_paused = {}", target_hp, 
is_paused);
-        
response.group_bulk_load_state[dsn::dns_resolver::instance().resolve_address(target_hp)]
 =
-            secondary_state;
-        response.hp_group_bulk_load_state[target_hp] = secondary_state;
+        SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, 
secondary_state);
         group_is_paused &= is_paused;
     }
     LOG_INFO_PREFIX("group bulk load is_paused = {}", group_is_paused);
diff --git a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp 
b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
index 1375549bf..95dbad2cb 100644
--- a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
+++ b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
@@ -220,8 +220,7 @@ public:
         _group_req.meta_bulk_load_status = status;
         _group_req.config.status = partition_status::PS_SECONDARY;
         _group_req.config.ballot = b;
-        _group_req.target = SECONDARY;
-        _group_req.__set_hp_target(SECONDARY_HP);
+        SET_IP_AND_HOST_PORT(_group_req, target, PRIMARY, PRIMARY_HP);
     }
 
     void mock_replica_config(partition_status::type status)
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index cc1cd70e0..00ca8dd1c 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -44,6 +44,8 @@ class TProtocol;
 } // namespace thrift
 } // namespace apache
 
+// Get host_port from 'obj', the result is filled in 'target', the source is 
from host_port type
+// field 'hp_<field>' if it is set, otherwise, reverse resolve from the 
rpc_address '<field>'.
 #define GET_HOST_PORT(obj, field, target)                                      
                    \
     do {                                                                       
                    \
         const auto &_obj = (obj);                                              
                    \
@@ -54,6 +56,38 @@ class TProtocol;
         }                                                                      
                    \
     } while (0)
 
+// Set 'addr' and 'hp' to the '<field>' and optional 'hp_<field>' of 'obj'. 
The type of the
+// fields are rpc_address and host_port, respectively.
+#define SET_IP_AND_HOST_PORT(obj, field, addr, hp)                             
                    \
+    do {                                                                       
                    \
+        auto &_obj = (obj);                                                    
                    \
+        _obj.field = addr;                                                     
                    \
+        _obj.__set_hp_##field(hp);                                             
                    \
+    } while (0)
+
+// Set 'value' to the '<field>' map and optional 'hp_<field>' map of 'obj'. 
The key of the
+// maps are rpc_address and host_port type and indexed by 'addr' and 'hp', 
respectively.
+#define SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, hp, value)           
                    \
+    do {                                                                       
                    \
+        auto &_obj = (obj);                                                    
                    \
+        _obj.field[addr] = value;                                              
                    \
+        if (!_obj.__isset.hp_##field) {                                        
                    \
+            _obj.__set_hp_##field({});                                         
                    \
+        }                                                                      
                    \
+        _obj.hp_##field[hp] = value;                                           
                    \
+    } while (0)
+
+// Set 'value' to the '<field>' map and optional 'hp_<field>' map of 'obj'. 
The key of the
+// maps are rpc_address and host_port type and indexed by 'addr' and reverse 
resolve result of
+// 'addr', respectively.
+#define SET_VALUE_FROM_HOST_PORT(obj, field, hp, value)                        
                    \
+    do {                                                                       
                    \
+        const auto addr = dsn::dns_resolver::instance().resolve_address(hp);   
                    \
+        SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, hp, value);          
                    \
+    } while (0)
+
+#define FMT_HOST_PORT_AND_IP(obj, field) fmt::format("{}({})", 
(obj).hp_##field, (obj).field)
+
 namespace dsn {
 
 class rpc_group_host_port;
diff --git a/src/shell/commands/bulk_load.cpp b/src/shell/commands/bulk_load.cpp
index 17153dcfb..e5b0265e3 100644
--- a/src/shell/commands/bulk_load.cpp
+++ b/src/shell/commands/bulk_load.cpp
@@ -331,6 +331,7 @@ bool query_bulk_load_status(command_executor *e, 
shell_context *sc, arguments ar
     if (print_download_progress) {
         for (auto i = 0; i < partition_count; ++i) {
             auto progress = 0;
+            // The 'bulk_load_states' must be set whatever the version of the 
server is.
             for (const auto &kv : resp.bulk_load_states[i]) {
                 progress += kv.second.download_progress;
             }
@@ -357,6 +358,7 @@ bool query_bulk_load_status(command_executor *e, 
shell_context *sc, arguments ar
         }
 
         for (auto i = 0; i < partition_count; ++i) {
+            // The 'bulk_load_states' must be set whatever the version of the 
server is.
             auto states = resp.bulk_load_states[i];
             tp_all.add_row(i);
             tp_all.append_data(get_short_status(resp.partitions_status[i]));
@@ -408,6 +410,7 @@ bool query_bulk_load_status(command_executor *e, 
shell_context *sc, arguments ar
                 tp_single.add_column("is_paused");
             }
 
+            // The 'bulk_load_states' must be set whatever the version of the 
server is.
             auto states = resp.bulk_load_states[pidx];
             for (auto iter = states.begin(); iter != states.end(); ++iter) {
                 tp_single.add_row(pidx);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to