This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push: new c945795c fix rpc_press can't send request equably (#1763) c945795c is described below commit c945795ce5becc75ccb6b612069b145a60d298f0 Author: bumingchun <bumingc...@126.com> AuthorDate: Mon Jun 20 13:49:26 2022 +0800 fix rpc_press can't send request equably (#1763) * fix rpc_press can't send request equably * set rate limit to 1000000 in one thread && calculate interval with nanosecond * change sleep function to usleep * Update rpc_press_impl.cpp add the max tolerant delay between end_time and expected_time Co-authored-by: bumingchun <bumingchun@192.168.1.8> --- tools/rpc_press/rpc_press.cpp | 8 ++++++++ tools/rpc_press/rpc_press_impl.cpp | 34 ++++++++++++---------------------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/tools/rpc_press/rpc_press.cpp b/tools/rpc_press/rpc_press.cpp index c176f962..997a9fb0 100644 --- a/tools/rpc_press/rpc_press.cpp +++ b/tools/rpc_press/rpc_press.cpp @@ -71,6 +71,14 @@ bool set_press_options(pbrpcframework::PressOptions* options){ } } + const int rate_limit_per_thread = 1000000; + double req_rate_per_thread = options->test_req_rate / options->test_thread_num; + if (req_rate_per_thread > rate_limit_per_thread) { + LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is too large in one thread. The rate limit is " + << rate_limit_per_thread << " in one thread"; + return false; + } + options->input = FLAGS_input; options->output = FLAGS_output; options->connection_type = FLAGS_connection_type; diff --git a/tools/rpc_press/rpc_press_impl.cpp b/tools/rpc_press/rpc_press_impl.cpp index f825f44c..8a873eff 100644 --- a/tools/rpc_press/rpc_press_impl.cpp +++ b/tools/rpc_press/rpc_press_impl.cpp @@ -219,14 +219,10 @@ void RpcPress::sync_client() { } const int thread_index = g_thread_count.fetch_add(1, butil::memory_order_relaxed); int msg_index = thread_index; - std::deque<int64_t> timeq; - size_t MAX_QUEUE_SIZE = (size_t)req_rate; - if (MAX_QUEUE_SIZE < 100) { - MAX_QUEUE_SIZE = 100; - } else if (MAX_QUEUE_SIZE > 2000) { - MAX_QUEUE_SIZE = 2000; - } - timeq.push_back(butil::gettimeofday_us()); + int64_t last_expected_time = butil::monotonic_time_ns(); + const int64_t interval = (int64_t) (1000000000L / req_rate); + // the max tolerant delay between end_time and expected_time. 10ms or 10 intervals + int64_t max_tolerant_delay = std::max(10000000L, 10 * interval); while (!_stop) { brpc::Controller* cntl = new brpc::Controller; msg_index = (msg_index + _options.test_thread_num) % _msgs.size(); @@ -247,21 +243,15 @@ void RpcPress::sync_client() { if (_options.test_req_rate <= 0) { brpc::Join(cid1); } else { - int64_t end_time = butil::gettimeofday_us(); - int64_t expected_elp = 0; - int64_t actual_elp = 0; - timeq.push_back(end_time); - if (timeq.size() > MAX_QUEUE_SIZE) { - actual_elp = end_time - timeq.front(); - timeq.pop_front(); - expected_elp = (int64_t)(1000000 * timeq.size() / req_rate); - } else { - actual_elp = end_time - timeq.front(); - expected_elp = (int64_t)(1000000 * (timeq.size() - 1) / req_rate); - } - if (actual_elp < expected_elp) { - usleep(expected_elp - actual_elp); + int64_t end_time = butil::monotonic_time_ns(); + int64_t expected_time = last_expected_time + interval; + if (end_time < expected_time) { + usleep((expected_time - end_time)/1000); } + if (end_time - expected_time > max_tolerant_delay) { + expected_time = end_time; + } + last_expected_time = expected_time; } } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org