This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new da1168b5677 branch-3.0: [enhancement](ms) Add some unit tests for rate
limiter #47396 (#47939)
da1168b5677 is described below
commit da1168b5677076b5a6fc6f8e9eb451084118ae7a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Feb 17 10:48:05 2025 +0800
branch-3.0: [enhancement](ms) Add some unit tests for rate limiter #47396
(#47939)
Cherry-picked from #47396
Co-authored-by: Siyang Tang <[email protected]>
---
cloud/test/rate_limiter_test.cpp | 146 +++++++++++++++++++++++++++++++--------
1 file changed, 117 insertions(+), 29 deletions(-)
diff --git a/cloud/test/rate_limiter_test.cpp b/cloud/test/rate_limiter_test.cpp
index cab7b01774c..a406fbb4932 100644
--- a/cloud/test/rate_limiter_test.cpp
+++ b/cloud/test/rate_limiter_test.cpp
@@ -20,7 +20,10 @@
#include <gen_cpp/cloud.pb.h>
#include <gtest/gtest.h>
+#include <atomic>
+#include <chrono>
#include <cstddef>
+#include <thread>
#include "common/config.h"
#include "common/util.h"
@@ -98,7 +101,7 @@ void mock_add_cluster(MetaServiceProxy& meta_service,
std::string instance_id) {
}
void mock_get_cluster(MetaServiceProxy& meta_service, const std::string&
cloud_uid,
- MetaServiceCode code) {
+ MetaServiceCode code, std::atomic_size_t& failed_cnt) {
GetClusterRequest req;
req.set_cloud_unique_id(cloud_uid);
req.set_cluster_id(mock_cluster_id);
@@ -108,43 +111,27 @@ void mock_get_cluster(MetaServiceProxy& meta_service,
const std::string& cloud_u
meta_service.get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
&res, nullptr);
- ASSERT_EQ(res.status().code(), code);
+ if (code == MetaServiceCode::OK) {
+ ASSERT_EQ(res.status().code(), code);
+ } else {
+ ++failed_cnt;
+ }
}
template <typename Rpc>
void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, const
std::string& cloud_uid,
MetaServiceCode expected, size_t times) {
+ std::atomic_size_t failed_cnt;
std::vector<std::thread> threads;
for (size_t i = 0; i < times; ++i) {
- threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected);
});
+ threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected,
failed_cnt); });
}
for (auto& t : threads) {
t.join();
}
-}
-
-TEST(RateLimiterTest, RateLimitGetClusterTest) {
- auto meta_service = get_meta_service();
- mock_add_cluster(*meta_service, mock_instance_0);
-
- mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
- MetaServiceCode::OK, 20);
-
- std::this_thread::sleep_for(std::chrono::seconds(1));
- meta_service->rate_limiter()
- ->get_rpc_rate_limiter("get_cluster")
- ->qps_limiter_[mock_instance_0]
- ->max_qps_limit_ = 1;
- mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
- MetaServiceCode::MAX_QPS_LIMIT, 1);
-
- std::this_thread::sleep_for(std::chrono::seconds(1));
- meta_service->rate_limiter()
- ->get_rpc_rate_limiter("get_cluster")
- ->qps_limiter_[mock_instance_0]
- ->max_qps_limit_ = 10000;
- mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
- MetaServiceCode::OK, 1);
+ if (expected != MetaServiceCode::OK) {
+ ASSERT_GT(failed_cnt, 0);
+ }
}
TEST(RateLimiterTest, AdjustLimitInfluenceTest) {
@@ -225,6 +212,107 @@ TEST(RateLimiterTest, AdjustLimitInfluenceTest) {
}
}
+TEST(RateLimiterTest, TestAdjustLimitInfluence1) {
+ auto meta_service = get_meta_service();
+ mock_add_cluster(*meta_service, mock_instance_0);
+ mock_add_cluster(*meta_service, mock_instance_1);
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(0));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::MAX_QPS_LIMIT, 1);
+ // have to sleep 5s to ensure sucess, maybe related with bvar latency
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(2,
"get_cluster"));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
+ MetaServiceCode::MAX_QPS_LIMIT, 1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::MAX_QPS_LIMIT, 3);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ {
+ ASSERT_TRUE(
+ meta_service->rate_limiter()->set_rate_limit(4, "get_cluster",
mock_instance_0));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 3);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
+ MetaServiceCode::MAX_QPS_LIMIT, 3);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ {
+ ASSERT_TRUE(
+ meta_service->rate_limiter()->set_rate_limit(2, "get_cluster",
mock_instance_0));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::MAX_QPS_LIMIT, 3);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ {
+ ASSERT_TRUE(
+ meta_service->rate_limiter()->set_rate_limit(6, "get_cluster",
mock_instance_0));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 5);
+ }
+}
+
+TEST(RateLimiterTest, TestAdjustLimitInfluence2) {
+ auto meta_service = get_meta_service();
+ mock_add_cluster(*meta_service, mock_instance_0);
+ mock_add_cluster(*meta_service, mock_instance_1);
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 1);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ {
+ ASSERT_TRUE(
+ meta_service->rate_limiter()->set_rate_limit(6, "get_cluster",
mock_instance_0));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::MAX_QPS_LIMIT, 7);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 5);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
+ MetaServiceCode::OK, 7);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(4,
mock_instance_0));
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::MAX_QPS_LIMIT, 5);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 3);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
+ MetaServiceCode::OK, 7);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 3);
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
+ MetaServiceCode::MAX_QPS_LIMIT, 3);
+ }
+}
+
TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
auto meta_service = get_meta_service();
mock_add_cluster(*meta_service, mock_instance_0);
@@ -233,7 +321,7 @@ TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
MetaServiceCode::OK, 20);
mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
MetaServiceCode::OK, 20);
- std::this_thread::sleep_for(std::chrono::seconds(1));
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000,
"get_cluster"));
@@ -243,7 +331,7 @@ TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
auto limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
ASSERT_EQ(limit, 10000);
- std::this_thread::sleep_for(std::chrono::seconds(1));
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1,
"get_cluster"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]