This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 274eadfd7 [util] a small clean up on the Throttler class
274eadfd7 is described below

commit 274eadfd79cb2ffcaa0c016376469d70036e11ca
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Fri Jul 19 15:38:57 2024 -0700

    [util] a small clean up on the Throttler class
    
    The motivation behind this patch was an idea of making the API of the
    Throttler class more robust after reviewing a changelist that used the
    functionality of the Throttler.  I have doubts that the current
    implementation of the Throttler behaves the way one would expect
    (there is just spotty test coverage for that), but it's another story.
    
    Change-Id: I43d60323c3d84da896c1a5429dfb7d461a24f9b2
    Reviewed-on: http://gerrit.cloudera.org:8080/21603
    Reviewed-by: Yingchun Lai <laiyingc...@apache.org>
    Tested-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/tablet/tablet.cc                   |  5 ++-
 src/kudu/tools/table_scanner.cc             |  6 ++--
 src/kudu/tools/tool_action_local_replica.cc |  3 +-
 src/kudu/tserver/tablet_copy_client-test.cc |  1 -
 src/kudu/tserver/tablet_copy_client.cc      |  4 +--
 src/kudu/util/throttler-test.cc             | 22 ++++++++----
 src/kudu/util/throttler.cc                  | 48 ++++++++++++++++---------
 src/kudu/util/throttler.h                   | 55 ++++++++++++++++++-----------
 8 files changed, 89 insertions(+), 55 deletions(-)

diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index d6d811d6a..909d5df0e 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -386,8 +386,7 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata,
       FLAGS_tablet_compaction_budget_mb, metrics_.get()));
 
   if (FLAGS_tablet_throttler_rpc_per_sec > 0 || 
FLAGS_tablet_throttler_bytes_per_sec > 0) {
-    throttler_.reset(new Throttler(MonoTime::Now(),
-                                   FLAGS_tablet_throttler_rpc_per_sec,
+    throttler_.reset(new Throttler(FLAGS_tablet_throttler_rpc_per_sec,
                                    FLAGS_tablet_throttler_bytes_per_sec,
                                    FLAGS_tablet_throttler_burst_factor));
   }
@@ -1795,7 +1794,7 @@ bool Tablet::ShouldThrottleAllow(int64_t bytes) {
   if (!throttler_) {
     return true;
   }
-  return throttler_->Take(MonoTime::Now(), 1, bytes);
+  return throttler_->Take(1, bytes);
 }
 
 Status Tablet::PickRowSetsToCompact(RowSetsInCompactionOrFlush *picked,
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index eebb94a41..fac0ce949 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -582,7 +582,7 @@ TableScanner::TableScanner(
       out_(nullptr) {
   CHECK_OK(SetReplicaSelection(FLAGS_replica_selection));
   if (FLAGS_table_copy_throttler_bytes_per_sec > 0) {
-    throttler_ = std::make_shared<Throttler>(MonoTime::Now(), 0,
+    throttler_ = std::make_shared<Throttler>(Throttler::kNoLimit,
                                              
FLAGS_table_copy_throttler_bytes_per_sec,
                                              
FLAGS_table_copy_throttler_burst_factor);
   }
@@ -611,9 +611,9 @@ Status TableScanner::ScanData(const vector<KuduScanToken*>& 
tokens,
       // Limit table copy speed.
       if (throttler_) {
         SCOPED_LOG_SLOW_EXECUTION(WARNING, 1000, "Table copy throttler");
-        while (!throttler_->Take(MonoTime::Now(), 0,
+        while (!throttler_->Take(0,
                                  batch.direct_data().size() + 
batch.indirect_data().size())) {
-          SleepFor(MonoDelta::FromMilliseconds(10));
+          SleepFor(MonoDelta::FromMicroseconds(Throttler::kRefillPeriodMicros 
/ 2));
         }
       }
       RETURN_NOT_OK(cb(batch));
diff --git a/src/kudu/tools/tool_action_local_replica.cc 
b/src/kudu/tools/tool_action_local_replica.cc
index 2449db455..5c15d6bbd 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -334,8 +334,7 @@ class TabletCopier {
 
     shared_ptr<Throttler> throttler;
     if (FLAGS_tablet_copy_throttler_bytes_per_sec > 0) {
-      throttler = std::make_shared<Throttler>(MonoTime::Now(),
-                                              0,
+      throttler = std::make_shared<Throttler>(0,
                                               
FLAGS_tablet_copy_throttler_bytes_per_sec,
                                               
FLAGS_tablet_copy_throttler_burst_factor);
     }
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc 
b/src/kudu/tserver/tablet_copy_client-test.cc
index a53f2aa31..fadaca536 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -319,7 +319,6 @@ class TabletCopyThrottlerTest : public TabletCopyClientTest 
{
   TabletCopyThrottlerTest() {
     mode_ = TabletCopyMode::REMOTE;
     throttler_ = std::make_shared<Throttler>(
-        MonoTime::Now(),
         0,
         FLAGS_tablet_copy_transfer_chunk_size_bytes,
         2 * FLAGS_tablet_copy_transfer_chunk_size_bytes);
diff --git a/src/kudu/tserver/tablet_copy_client.cc 
b/src/kudu/tserver/tablet_copy_client.cc
index d392c72c2..85772f813 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -1036,8 +1036,8 @@ Status RemoteTabletCopyClient::DownloadFile(const 
DataIdPB& data_id,
     }
     if (throttler_) {
       LOG_TIMING(INFO, "Tablet copy throttler") {
-        while (!throttler_->Take(MonoTime::Now(), 0, chunk_size)) {
-          SleepFor(MonoDelta::FromMilliseconds(10));
+        while (!throttler_->Take(0, chunk_size)) {
+          SleepFor(MonoDelta::FromMicroseconds(Throttler::kRefillPeriodMicros 
/ 2));
         }
       }
     }
diff --git a/src/kudu/util/throttler-test.cc b/src/kudu/util/throttler-test.cc
index ff97eb5da..3c391949b 100644
--- a/src/kudu/util/throttler-test.cc
+++ b/src/kudu/util/throttler-test.cc
@@ -17,6 +17,8 @@
 
 #include "kudu/util/throttler.h"
 
+#include <string>
+
 #include <gtest/gtest.h>
 
 #include "kudu/util/monotime.h"
@@ -27,7 +29,13 @@ namespace kudu {
 class ThrottlerTest : public KuduTest {
 };
 
-TEST_F(ThrottlerTest, TestOpThrottle) {
+TEST_F(ThrottlerTest, Basic) {
+  Throttler t(1, 1, 1.0);
+  ASSERT_TRUE(t.Take(0, 1));
+  ASSERT_TRUE(t.Take(1, 0));
+}
+
+TEST_F(ThrottlerTest, OpThrottle) {
   // Check operation rate throttling
   MonoTime now = MonoTime::Now();
   Throttler t0(now, 1000, 1000*1000, 1);
@@ -39,12 +47,12 @@ TEST_F(ThrottlerTest, TestOpThrottle) {
       ASSERT_TRUE(t0.Take(now, 1, 1));
     }
     ASSERT_FALSE(t0.Take(now, 1, 1));
-    now += MonoDelta::FromMilliseconds(100);
+    now += MonoDelta::FromMicroseconds(Throttler::kRefillPeriodMicros);
   }
 }
 
-TEST_F(ThrottlerTest, TestIOThrottle) {
-  // Check operation rate throttling
+TEST_F(ThrottlerTest, IOThrottle) {
+  // Check IO rate throttling
   MonoTime now = MonoTime::Now();
   Throttler t0(now, 50000, 1000*1000, 1);
   // Fill up bucket
@@ -55,12 +63,12 @@ TEST_F(ThrottlerTest, TestIOThrottle) {
       ASSERT_TRUE(t0.Take(now, 1, 1000));
     }
     ASSERT_FALSE(t0.Take(now, 1, 1000));
-    now += MonoDelta::FromMilliseconds(100);
+    now += MonoDelta::FromMilliseconds(Throttler::kRefillPeriodMicros);
   }
 }
 
-TEST_F(ThrottlerTest, TestBurst) {
-  // Check IO rate throttling
+TEST_F(ThrottlerTest, Burst) {
+  // Check throttling for bursty consuming
   MonoTime now = MonoTime::Now();
   Throttler t0(now, 2000, 1000*1000, 5);
   // Fill up bucket
diff --git a/src/kudu/util/throttler.cc b/src/kudu/util/throttler.cc
index 69e0f99dc..f4b35428c 100644
--- a/src/kudu/util/throttler.cc
+++ b/src/kudu/util/throttler.cc
@@ -18,33 +18,49 @@
 #include "kudu/util/throttler.h"
 
 #include <algorithm>
-#include <mutex>
+
+#include <glog/logging.h>
 
 namespace kudu {
 
-Throttler::Throttler(MonoTime now, uint64_t op_rate, uint64_t byte_rate, 
double burst_factor) :
-    next_refill_(now) {
-  op_refill_ = op_rate / (MonoTime::kMicrosecondsPerSecond / 
kRefillPeriodMicros);
-  op_token_ = 0;
-  op_token_max_ = static_cast<uint64_t>(op_refill_ * burst_factor);
-  byte_refill_ = byte_rate / (MonoTime::kMicrosecondsPerSecond / 
kRefillPeriodMicros);
-  byte_token_ = 0;
-  byte_token_max_ = static_cast<uint64_t>(byte_refill_ * burst_factor);
+Throttler::Throttler(uint64_t op_rate_per_sec,
+                     uint64_t byte_rate_per_sec,
+                     double burst_factor)
+    : Throttler(MonoTime::Now(), op_rate_per_sec, byte_rate_per_sec, 
burst_factor) {
+}
+
+Throttler::Throttler(MonoTime now,
+                     uint64_t op_rate_per_sec,
+                     uint64_t byte_rate_per_sec,
+                     double burst_factor)
+    : byte_refill_(byte_rate_per_sec / (MonoTime::kMicrosecondsPerSecond / 
kRefillPeriodMicros)),
+      byte_token_max_(static_cast<uint64_t>(byte_refill_ * burst_factor)),
+      op_refill_(op_rate_per_sec / (MonoTime::kMicrosecondsPerSecond / 
kRefillPeriodMicros)),
+      op_token_max_(static_cast<uint64_t>(op_refill_ * burst_factor)),
+      byte_token_(0),
+      op_token_(0),
+      next_refill_(now) {
 }
 
-bool Throttler::Take(MonoTime now, uint64_t op, uint64_t byte) {
-  if (op_refill_ == 0 && byte_refill_ == 0) {
+bool Throttler::Take(uint64_t ops, uint64_t bytes) {
+  return Take(MonoTime::Now(), ops, bytes);
+}
+
+bool Throttler::Take(MonoTime now, uint64_t ops, uint64_t bytes) {
+  DCHECK(ops > 0 || bytes > 0);
+  if (op_refill_ == kNoLimit && byte_refill_ == kNoLimit) {
     return true;
   }
-  std::lock_guard<simple_spinlock> lock(lock_);
+
+  std::lock_guard lock(lock_);
   Refill(now);
-  if ((op_refill_ == 0 || op <= op_token_) &&
-      (byte_refill_ == 0 || byte <= byte_token_)) {
+  if ((op_refill_ == 0 || ops <= op_token_) &&
+      (byte_refill_ == 0 || bytes <= byte_token_)) {
     if (op_refill_ > 0) {
-      op_token_ -= op;
+      op_token_ -= ops;
     }
     if (byte_refill_ > 0) {
-      byte_token_ -= byte;
+      byte_token_ -= bytes;
     }
     return true;
   }
diff --git a/src/kudu/util/throttler.h b/src/kudu/util/throttler.h
index 559409120..191408077 100644
--- a/src/kudu/util/throttler.h
+++ b/src/kudu/util/throttler.h
@@ -14,49 +14,62 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_UTIL_THROTTLER_H
-#define KUDU_UTIL_THROTTLER_H
+#pragma once
 
 #include <cstdint>
 
+#include <gtest/gtest_prod.h>
+
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 
 namespace kudu {
 
 // A throttler to throttle both operation/s and IO byte/s.
-class Throttler {
+class Throttler final {
  public:
   // Refill period is 100ms.
-  enum {
-    kRefillPeriodMicros = 100000
-  };
+  static constexpr const int64_t kRefillPeriodMicros = 100000;
+  static constexpr const uint64_t kNoLimit = 0;
 
   // Construct a throttler with max operation per second, max IO bytes per 
second
-  // and burst factor (burst_rate = rate * burst), burst rate means maximum
-  // throughput within one refill period.
-  // Set op_per_sec to 0 to disable operation throttling.
-  // Set byte_per_sec to 0 to disable IO bytes throttling.
-  Throttler(MonoTime now, uint64_t op_per_sec, uint64_t byte_per_sec, double 
burst_factor);
+  // and burst factor (burst_rate = rate * burst_factor), burst_rate means
+  // the maximum throughput within one refill period.
+  // Set op_rate_per_sec to kNoLimit to disable operation rate throttling.
+  // Set byte_rate_per_sec to kNoLimit to disable IO rate throttling.
+  Throttler(uint64_t op_rate_per_sec,
+            uint64_t byte_rate_per_sec,
+            double burst_factor);
 
-  // Throttle an "operation group" by taking 'op' operation tokens and 'byte' 
byte tokens.
+  // Throttle an "operation group" by taking 'ops' operation tokens and 'bytes'
+  // byte tokens.
   // Return true if there are enough tokens, and operation is allowed.
   // Return false if there are not enough tokens, and operation is throttled.
-  bool Take(MonoTime now, uint64_t op, uint64_t byte);
+  bool Take(uint64_t ops, uint64_t bytes);
 
  private:
+  FRIEND_TEST(ThrottlerTest, OpThrottle);
+  FRIEND_TEST(ThrottlerTest, IOThrottle);
+  FRIEND_TEST(ThrottlerTest, Burst);
+
+  Throttler(MonoTime now,
+            uint64_t op_rate_per_sec,
+            uint64_t byte_rate_per_sec,
+            double burst_factor);
+
+  bool Take(MonoTime now, uint64_t ops, uint64_t bytes);
+
   void Refill(MonoTime now);
 
-  MonoTime next_refill_;
-  uint64_t op_refill_;
-  uint64_t op_token_;
-  uint64_t op_token_max_;
-  uint64_t byte_refill_;
+  const uint64_t byte_refill_;
+  const uint64_t byte_token_max_;
+  const uint64_t op_refill_;
+  const uint64_t op_token_max_;
+
   uint64_t byte_token_;
-  uint64_t byte_token_max_;
+  uint64_t op_token_;
+  MonoTime next_refill_;
   simple_spinlock lock_;
 };
 
 } // namespace kudu
-
-#endif

Reply via email to