This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new da1168b5677 branch-3.0: [enhancement](ms) Add some unit tests for rate 
limiter #47396 (#47939)
da1168b5677 is described below

commit da1168b5677076b5a6fc6f8e9eb451084118ae7a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Feb 17 10:48:05 2025 +0800

    branch-3.0: [enhancement](ms) Add some unit tests for rate limiter #47396 
(#47939)
    
    Cherry-picked from #47396
    
    Co-authored-by: Siyang Tang <[email protected]>
---
 cloud/test/rate_limiter_test.cpp | 146 +++++++++++++++++++++++++++++++--------
 1 file changed, 117 insertions(+), 29 deletions(-)

diff --git a/cloud/test/rate_limiter_test.cpp b/cloud/test/rate_limiter_test.cpp
index cab7b01774c..a406fbb4932 100644
--- a/cloud/test/rate_limiter_test.cpp
+++ b/cloud/test/rate_limiter_test.cpp
@@ -20,7 +20,10 @@
 #include <gen_cpp/cloud.pb.h>
 #include <gtest/gtest.h>
 
+#include <atomic>
+#include <chrono>
 #include <cstddef>
+#include <thread>
 
 #include "common/config.h"
 #include "common/util.h"
@@ -98,7 +101,7 @@ void mock_add_cluster(MetaServiceProxy& meta_service, 
std::string instance_id) {
 }
 
 void mock_get_cluster(MetaServiceProxy& meta_service, const std::string& 
cloud_uid,
-                      MetaServiceCode code) {
+                      MetaServiceCode code, std::atomic_size_t& failed_cnt) {
     GetClusterRequest req;
     req.set_cloud_unique_id(cloud_uid);
     req.set_cluster_id(mock_cluster_id);
@@ -108,43 +111,27 @@ void mock_get_cluster(MetaServiceProxy& meta_service, 
const std::string& cloud_u
     
meta_service.get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
                              &res, nullptr);
 
-    ASSERT_EQ(res.status().code(), code);
+    if (code == MetaServiceCode::OK) {
+        ASSERT_EQ(res.status().code(), code);
+    } else {
+        ++failed_cnt;
+    }
 }
 
 template <typename Rpc>
 void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, const 
std::string& cloud_uid,
                        MetaServiceCode expected, size_t times) {
+    std::atomic_size_t failed_cnt;
     std::vector<std::thread> threads;
     for (size_t i = 0; i < times; ++i) {
-        threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected); 
});
+        threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected, 
failed_cnt); });
     }
     for (auto& t : threads) {
         t.join();
     }
-}
-
-TEST(RateLimiterTest, RateLimitGetClusterTest) {
-    auto meta_service = get_meta_service();
-    mock_add_cluster(*meta_service, mock_instance_0);
-
-    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
-                      MetaServiceCode::OK, 20);
-
-    std::this_thread::sleep_for(std::chrono::seconds(1));
-    meta_service->rate_limiter()
-            ->get_rpc_rate_limiter("get_cluster")
-            ->qps_limiter_[mock_instance_0]
-            ->max_qps_limit_ = 1;
-    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
-                      MetaServiceCode::MAX_QPS_LIMIT, 1);
-
-    std::this_thread::sleep_for(std::chrono::seconds(1));
-    meta_service->rate_limiter()
-            ->get_rpc_rate_limiter("get_cluster")
-            ->qps_limiter_[mock_instance_0]
-            ->max_qps_limit_ = 10000;
-    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
-                      MetaServiceCode::OK, 1);
+    if (expected != MetaServiceCode::OK) {
+        ASSERT_GT(failed_cnt, 0);
+    }
 }
 
 TEST(RateLimiterTest, AdjustLimitInfluenceTest) {
@@ -225,6 +212,107 @@ TEST(RateLimiterTest, AdjustLimitInfluenceTest) {
     }
 }
 
+TEST(RateLimiterTest, TestAdjustLimitInfluence1) {
+    auto meta_service = get_meta_service();
+    mock_add_cluster(*meta_service, mock_instance_0);
+    mock_add_cluster(*meta_service, mock_instance_1);
+    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                      MetaServiceCode::OK, 1);
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(0));
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::MAX_QPS_LIMIT, 1);
+        // have to sleep 5s to ensure sucess, maybe related with bvar latency
+        std::this_thread::sleep_for(std::chrono::seconds(5));
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(2, 
"get_cluster"));
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::OK, 1);
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
+                          MetaServiceCode::MAX_QPS_LIMIT, 1);
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::MAX_QPS_LIMIT, 3);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    {
+        ASSERT_TRUE(
+                meta_service->rate_limiter()->set_rate_limit(4, "get_cluster", 
mock_instance_0));
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::OK, 3);
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
+                          MetaServiceCode::MAX_QPS_LIMIT, 3);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    {
+        ASSERT_TRUE(
+                meta_service->rate_limiter()->set_rate_limit(2, "get_cluster", 
mock_instance_0));
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::MAX_QPS_LIMIT, 3);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    {
+        ASSERT_TRUE(
+                meta_service->rate_limiter()->set_rate_limit(6, "get_cluster", 
mock_instance_0));
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::OK, 5);
+    }
+}
+
+TEST(RateLimiterTest, TestAdjustLimitInfluence2) {
+    auto meta_service = get_meta_service();
+    mock_add_cluster(*meta_service, mock_instance_0);
+    mock_add_cluster(*meta_service, mock_instance_1);
+    mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                      MetaServiceCode::OK, 1);
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    {
+        ASSERT_TRUE(
+                meta_service->rate_limiter()->set_rate_limit(6, "get_cluster", 
mock_instance_0));
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::MAX_QPS_LIMIT, 7);
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::OK, 5);
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
+                          MetaServiceCode::OK, 7);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(4, 
mock_instance_0));
+        std::this_thread::sleep_for(std::chrono::seconds(2));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::MAX_QPS_LIMIT, 5);
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::OK, 3);
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
+                          MetaServiceCode::OK, 7);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    {
+        ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_0,
+                          MetaServiceCode::OK, 3);
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
+                          MetaServiceCode::MAX_QPS_LIMIT, 3);
+    }
+}
+
 TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
     auto meta_service = get_meta_service();
     mock_add_cluster(*meta_service, mock_instance_0);
@@ -233,7 +321,7 @@ TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
                       MetaServiceCode::OK, 20);
     mock_parallel_rpc(mock_get_cluster, meta_service.get(), 
mock_cloud_unique_id_1,
                       MetaServiceCode::OK, 20);
-    std::this_thread::sleep_for(std::chrono::seconds(1));
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
     {
         ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, 
"get_cluster"));
@@ -243,7 +331,7 @@ TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
         auto limit =
                 
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
         ASSERT_EQ(limit, 10000);
-        std::this_thread::sleep_for(std::chrono::seconds(1));
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
     {
         ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1, 
"get_cluster"));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to