This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new c945795c fix rpc_press can't send request equably (#1763)
c945795c is described below
commit c945795ce5becc75ccb6b612069b145a60d298f0
Author: bumingchun <[email protected]>
AuthorDate: Mon Jun 20 13:49:26 2022 +0800
fix rpc_press can't send request equably (#1763)
* fix rpc_press can't send request equably
* set rate limit to 1000000 in one thread && calculate interval with
nanosecond
* change sleep function to usleep
* Update rpc_press_impl.cpp
add the max tolerant delay between end_time and expected_time
Co-authored-by: bumingchun <[email protected]>
---
tools/rpc_press/rpc_press.cpp | 8 ++++++++
tools/rpc_press/rpc_press_impl.cpp | 34 ++++++++++++----------------------
2 files changed, 20 insertions(+), 22 deletions(-)
diff --git a/tools/rpc_press/rpc_press.cpp b/tools/rpc_press/rpc_press.cpp
index c176f962..997a9fb0 100644
--- a/tools/rpc_press/rpc_press.cpp
+++ b/tools/rpc_press/rpc_press.cpp
@@ -71,6 +71,14 @@ bool set_press_options(pbrpcframework::PressOptions*
options){
}
}
+ const int rate_limit_per_thread = 1000000;
+ double req_rate_per_thread = options->test_req_rate /
options->test_thread_num;
+ if (req_rate_per_thread > rate_limit_per_thread) {
+ LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is
too large in one thread. The rate limit is "
+ << rate_limit_per_thread << " in one thread";
+ return false;
+ }
+
options->input = FLAGS_input;
options->output = FLAGS_output;
options->connection_type = FLAGS_connection_type;
diff --git a/tools/rpc_press/rpc_press_impl.cpp
b/tools/rpc_press/rpc_press_impl.cpp
index f825f44c..8a873eff 100644
--- a/tools/rpc_press/rpc_press_impl.cpp
+++ b/tools/rpc_press/rpc_press_impl.cpp
@@ -219,14 +219,10 @@ void RpcPress::sync_client() {
}
const int thread_index = g_thread_count.fetch_add(1,
butil::memory_order_relaxed);
int msg_index = thread_index;
- std::deque<int64_t> timeq;
- size_t MAX_QUEUE_SIZE = (size_t)req_rate;
- if (MAX_QUEUE_SIZE < 100) {
- MAX_QUEUE_SIZE = 100;
- } else if (MAX_QUEUE_SIZE > 2000) {
- MAX_QUEUE_SIZE = 2000;
- }
- timeq.push_back(butil::gettimeofday_us());
+ int64_t last_expected_time = butil::monotonic_time_ns();
+ const int64_t interval = (int64_t) (1000000000L / req_rate);
+ // the max tolerant delay between end_time and expected_time. 10ms or 10
intervals
+ int64_t max_tolerant_delay = std::max(10000000L, 10 * interval);
while (!_stop) {
brpc::Controller* cntl = new brpc::Controller;
msg_index = (msg_index + _options.test_thread_num) % _msgs.size();
@@ -247,21 +243,15 @@ void RpcPress::sync_client() {
if (_options.test_req_rate <= 0) {
brpc::Join(cid1);
} else {
- int64_t end_time = butil::gettimeofday_us();
- int64_t expected_elp = 0;
- int64_t actual_elp = 0;
- timeq.push_back(end_time);
- if (timeq.size() > MAX_QUEUE_SIZE) {
- actual_elp = end_time - timeq.front();
- timeq.pop_front();
- expected_elp = (int64_t)(1000000 * timeq.size() / req_rate);
- } else {
- actual_elp = end_time - timeq.front();
- expected_elp = (int64_t)(1000000 * (timeq.size() - 1) /
req_rate);
- }
- if (actual_elp < expected_elp) {
- usleep(expected_elp - actual_elp);
+ int64_t end_time = butil::monotonic_time_ns();
+ int64_t expected_time = last_expected_time + interval;
+ if (end_time < expected_time) {
+ usleep((expected_time - end_time)/1000);
}
+ if (end_time - expected_time > max_tolerant_delay) {
+ expected_time = end_time;
+ }
+ last_expected_time = expected_time;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]