This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 66a79dddf KUDU-3407 Avoid unchecked scheduling of flush operations.
66a79dddf is described below

commit 66a79dddf2faf31dbb49cd8e08c0e322c4bb57ce
Author: 宋家成 <songjiach...@thinkingdata.cn>
AuthorDate: Fri Jul 7 14:36:29 2023 +0800

    KUDU-3407 Avoid unchecked scheduling of flush operations.
    
    In some clusters, the memory usages of tservers might be 60% ~ 80%
    for a long time. During this time the maintenance manager will not
    run any operation other than wal gc and MRS/DRS flushes, which will
    make the performance of tservers worse and worse and eventually break
    due to OOM.
    
    This patch add an argument to give a chance to do other operations
    while server is under memory pressure.
    
    This mechanism works when the memory usage is between
    memory_pressure_percentage and memory_limit_soft_percentage.
    Higher the memory usage is, higher the probability to flush
    MRS/DMS.
    
    e.g.
    memory_pressure_percentage = 60%
    memory_limit_soft_percentage = 80%
    The probability of not flushing MRS/DMS is the value of
    run_non_memory_ops_prob. As the memory increases, it gradually
    decreases to 0, when thememory usage is 80%.
    
    Change-Id: Idc2fd3a850cf99d54ef2980211b712468440ed80
    Reviewed-on: http://gerrit.cloudera.org:8080/20166
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/util/maintenance_manager-test.cc | 158 ++++++++++++++++++++++++++++--
 src/kudu/util/maintenance_manager.cc      |  43 +++++++-
 src/kudu/util/maintenance_manager.h       |   9 ++
 3 files changed, 200 insertions(+), 10 deletions(-)

diff --git a/src/kudu/util/maintenance_manager-test.cc 
b/src/kudu/util/maintenance_manager-test.cc
index de78f9608..177fca68f 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -29,6 +29,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -44,9 +45,12 @@
 #include "kudu/util/maintenance_manager_metrics.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
 
 using std::list;
 using std::shared_ptr;
@@ -72,6 +76,10 @@ DECLARE_bool(enable_maintenance_manager);
 DECLARE_int64(log_target_replay_size_mb);
 DECLARE_double(maintenance_op_multiplier);
 DECLARE_int32(max_priority_range);
+DECLARE_double(run_non_memory_ops_prob);
+DECLARE_double(data_gc_prioritization_prob);
+DECLARE_int32(memory_pressure_percentage);
+DECLARE_int32(memory_limit_soft_percentage);
 namespace kudu {
 
 // Set this a bit bigger so that the manager could keep track of all possible 
completed ops.
@@ -89,6 +97,7 @@ class TestMaintenanceOp : public MaintenanceOp {
       start_stats_latch_(start_stats_latch),
       continue_stats_latch_(continue_stats_latch),
       ram_anchored_(500),
+      data_retained_bytes_(0),
       logs_retained_bytes_(0),
       perf_improvement_(0),
       metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, 
"test")),
@@ -100,12 +109,18 @@ class TestMaintenanceOp : public MaintenanceOp {
       update_stats_time_(MonoDelta::FromSeconds(0)),
       priority_(priority),
       workload_score_(0),
-      update_stats_count_(0) {
+      update_stats_count_(0),
+      update_time_(MonoTime::Now()),
+      queue_time_(MonoDelta::FromSeconds(0)),
+      run_count_(0),
+      updated_(false),
+      register_self_(false) {
   }
 
   ~TestMaintenanceOp() override = default;
 
   bool Prepare() override {
+    queue_time_ += (MonoTime::Now() - update_time_);
     std::lock_guard<simple_spinlock> guard(lock_);
     if (remaining_runs_ == 0) {
       return false;
@@ -126,13 +141,20 @@ class TestMaintenanceOp : public MaintenanceOp {
       CHECK_GE(prepared_runs_, 1);
       prepared_runs_--;
     }
-
     SleepFor(sleep_time_);
-
     {
       std::lock_guard<simple_spinlock> guard(lock_);
+      run_count_++;
+      updated_ = false;
       completed_at_ = MonoTime::Now();
     }
+    if (register_self_) {
+      scoped_refptr<kudu::Thread> thread;
+      // Re-register itself after 50ms.
+      kudu::Thread::Create("maintenance-test", "self-register", [this]() {
+        this->set_remaining_runs(1);
+      }, &thread);
+    }
   }
 
   void UpdateStats(MaintenanceOpStats* stats) override {
@@ -153,9 +175,13 @@ class TestMaintenanceOp : public MaintenanceOp {
     stats->set_runnable(remaining_runs_ > 0);
     stats->set_ram_anchored(ram_anchored_);
     stats->set_logs_retained_bytes(logs_retained_bytes_);
+    stats->set_data_retained_bytes(data_retained_bytes_);
     stats->set_perf_improvement(perf_improvement_);
     stats->set_workload_score(workload_score_);
-
+    if (remaining_runs_ > 0 && !updated_) {
+      update_time_ = MonoTime::Now();
+      updated_ = true;
+    }
     ++update_stats_count_;
   }
 
@@ -184,6 +210,11 @@ class TestMaintenanceOp : public MaintenanceOp {
     logs_retained_bytes_ = logs_retained_bytes;
   }
 
+  void set_data_retained_bytes(uint64_t data_retained_bytes) {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    data_retained_bytes_ = data_retained_bytes;
+  }
+
   void set_perf_improvement(uint64_t perf_improvement) {
     std::lock_guard<simple_spinlock> guard(lock_);
     perf_improvement_ = perf_improvement;
@@ -194,6 +225,11 @@ class TestMaintenanceOp : public MaintenanceOp {
     workload_score_ = workload_score;
   }
 
+  void set_register_self(bool register_self) {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    register_self_ = register_self;
+  }
+
   scoped_refptr<Histogram> DurationHistogram() const override {
     return maintenance_op_duration_;
   }
@@ -221,6 +257,14 @@ class TestMaintenanceOp : public MaintenanceOp {
     return completed_at_;
   }
 
+  int64_t run_count() const {
+    return run_count_;
+  }
+
+  MonoDelta queue_time() const {
+    return queue_time_;
+  }
+
  private:
   mutable simple_spinlock lock_;
 
@@ -233,6 +277,7 @@ class TestMaintenanceOp : public MaintenanceOp {
   CountDownLatch* continue_stats_latch_;
 
   uint64_t ram_anchored_;
+  uint64_t data_retained_bytes_;
   uint64_t logs_retained_bytes_;
   uint64_t perf_improvement_;
   MetricRegistry metric_registry_;
@@ -262,6 +307,18 @@ class TestMaintenanceOp : public MaintenanceOp {
 
   // Timestamp of the monotonous clock when the operation was completed.
   MonoTime completed_at_;
+
+  // Timestamp of updating status in FindBestOp.
+  MonoTime update_time_;
+  // Sum of scheduled time, which indicates how long this op has been
+  // scheduled after it could be run.
+  MonoDelta queue_time_;
+  // How many times the operation has been run.
+  int64_t run_count_;
+  // If the operation had been updated but has not been performed.
+  bool updated_;
+  // Whether re-register itself after performing.
+  bool register_self_;
 };
 
 class MaintenanceManagerTest : public KuduTest {
@@ -285,9 +342,20 @@ class MaintenanceManagerTest : public KuduTest {
     options.polling_interval_ms = 1;
     options.history_size = kHistorySize;
     manager_.reset(new MaintenanceManager(options, kFakeUuid, metric_entity_));
+    // Same logic with MaintenanceManager#ProceedWithFlush, but the memory 
usage is
+    // simulated.
     manager_->set_memory_pressure_func_for_tests(
         [&](double* /* consumption */) {
-          return indicate_memory_pressure_.load();
+          double pressure_ratio = 
static_cast<double>(FLAGS_memory_pressure_percentage) / 100;
+          if (memory_pressure_pct_.load() >= pressure_ratio) {
+            double pressure_threshold = pressure_ratio;
+            double soft_limit = 
static_cast<double>(FLAGS_memory_limit_soft_percentage) / 100;
+            return pressure_threshold >= soft_limit || 
memory_pressure_pct_.load() >=
+                soft_limit ||Random(GetRandomSeed32()).NextDoubleFraction() >=
+                FLAGS_run_non_memory_ops_prob * (soft_limit - 
memory_pressure_pct_.load())
+                / (soft_limit - pressure_threshold);
+          }
+          return false;
         });
     ASSERT_OK(manager_->Start());
   }
@@ -317,7 +385,7 @@ class MaintenanceManagerTest : public KuduTest {
   scoped_refptr<MetricEntity> metric_entity_;
 
   shared_ptr<MaintenanceManager> manager_;
-  std::atomic<bool> indicate_memory_pressure_ { false };
+  std::atomic<double> memory_pressure_pct_ { 0.0 };
 };
 
 // Just create the MaintenanceManager and then shut it down, to make sure
@@ -412,7 +480,7 @@ TEST_F(MaintenanceManagerTest, 
TestMemoryPressurePrioritizesMemory) {
   ASSERT_EQ(0, op.DurationHistogram()->TotalCount());
 
   // Fake that the server is under memory pressure.
-  indicate_memory_pressure_ = true;
+  memory_pressure_pct_ = 0.7;
 
   ASSERT_EVENTUALLY([&]() {
       ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
@@ -433,7 +501,7 @@ TEST_F(MaintenanceManagerTest, 
TestMemoryPressurePerformsNoMemoryOp) {
 
   // Now fake that the server is under memory pressure and make our op runnable
   // by giving it a perf score.
-  indicate_memory_pressure_ = true;
+  memory_pressure_pct_ = 0.7;
   op.set_perf_improvement(1);
 
   // Even though we're under memory pressure, and even though our op doesn't
@@ -513,7 +581,7 @@ TEST_F(MaintenanceManagerTest, 
TestPrioritizeLogRetentionUnderMemoryPressure) {
   op3.set_logs_retained_bytes(99);
   op3.set_ram_anchored(101);
 
-  indicate_memory_pressure_ = true;
+  memory_pressure_pct_ = 0.7;
   manager_->RegisterOp(&op1);
   manager_->RegisterOp(&op2);
   manager_->RegisterOp(&op3);
@@ -899,4 +967,76 @@ TEST_F(MaintenanceManagerTest, 
TestUnregisterWhileScheduling) {
   op1.Unregister();
 }
 
+// Show which operation the @MaintenanceManager will pick in various workloads 
and
+// policies. This test would not assert anything since it tests the 
probability flags.
+TEST_F(MaintenanceManagerTest, ComprehensiveTest) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Select policies here.
+  memory_pressure_pct_ = 0.6;
+  FLAGS_run_non_memory_ops_prob = 0.2;
+  FLAGS_data_gc_prioritization_prob = 0.5;
+
+  StopManager();
+
+  TestMaintenanceOp op1("perf_op", MaintenanceOp::HIGH_IO_USAGE);
+  op1.set_perf_improvement(10);
+  op1.set_remaining_runs(1);
+  op1.set_sleep_time(MonoDelta::FromMilliseconds(5));
+  op1.set_register_self(true);
+
+  TestMaintenanceOp op2("memory_op", MaintenanceOp::HIGH_IO_USAGE);
+  op2.set_ram_anchored(2000);
+  op2.set_remaining_runs(1);
+  op2.set_sleep_time(MonoDelta::FromMilliseconds(5));
+  op2.set_register_self(true);
+
+  TestMaintenanceOp op3("data_gc_op", MaintenanceOp::HIGH_IO_USAGE);
+  op3.set_data_retained_bytes(1024 * 1024);
+  op3.set_remaining_runs(1);
+  op3.set_sleep_time(MonoDelta::FromMilliseconds(5));
+  op3.set_register_self(true);
+
+  // Set maintenance manager num to 1 to get a clean vision of scheduling.
+  NO_FATALS(StartManager(1));
+  FLAGS_enable_maintenance_manager = false;
+  manager_->RegisterOp(&op1);
+  manager_->RegisterOp(&op2);
+  manager_->RegisterOp(&op3);
+  FLAGS_enable_maintenance_manager = true;
+  // Wait for the memory_op to run over 1000 times and then check the running 
times
+  // of other operations.
+  AssertEventually([&]() {
+    ASSERT_LE(1000, op2.DurationHistogram()->TotalCount());
+  }, MonoDelta::FromSeconds(60));
+  op1.set_register_self(false);
+  op2.set_register_self(false);
+  op3.set_register_self(false);
+  // Wait until all the operations are done.
+  SleepFor(MonoDelta::FromMilliseconds(100));
+
+  manager_->UnregisterOp(&op1);
+  manager_->UnregisterOp(&op2);
+  manager_->UnregisterOp(&op3);
+
+  // The expected running times of operations should be like:
+  //    other_ops_running_times = memory_op_running_times * 
probability_not_to_flush
+  // Since the memory usage is 60%, probability_not_to_flush is exactly
+  // FLAGS_run_non_memory_ops_prob. And also, the applying and registering 
might
+  // take time, so the other_ops_running_times might be greater than expected.
+  const int64_t memory_op_running_times = op2.run_count();
+  const int64_t other_ops_running_times = op1.run_count() + op3.run_count();
+  ASSERT_LT(memory_op_running_times * 0.15, other_ops_running_times);
+  ASSERT_GT(memory_op_running_times * 0.35, other_ops_running_times);
+
+  LOG(INFO) << Substitute("op1: $0 perform count: $1 average schedule time: 
$2",
+                          op1.name(), op1.run_count(), 
op1.queue_time().ToMilliseconds()
+                          / op1.run_count());
+  LOG(INFO) << Substitute("op2: $0 perform count: $1 average schedule time: 
$2",
+                          op2.name(), op2.run_count(), 
op2.queue_time().ToMilliseconds()
+                          / op2.run_count());
+  LOG(INFO) << Substitute("op3: $0 perform count: $1 average schedule time: 
$2",
+                          op3.name(), op3.run_count(), 
op3.queue_time().ToMilliseconds()
+                          / op3.run_count());
+}
 } // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.cc 
b/src/kudu/util/maintenance_manager.cc
index 56064e3e1..898fb0e3f 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -101,6 +101,18 @@ DEFINE_double(data_gc_prioritization_prob, 0.5,
              "such as delta compaction.");
 TAG_FLAG(data_gc_prioritization_prob, experimental);
 
+DEFINE_double(run_non_memory_ops_prob, 0,
+              "The probability that the tablet server will not flush DRS or 
MRS "
+              "while under memory pressure. This is useful when the server is 
under "
+              "memory pressure for a long time and there are non-memory 
operations "
+              "waiting to be run. The higher value means higher probability to 
"
+              "do other ops instead of flushing ops. This might be needed to 
turn "
+              "on if system admin found that the tablet server is under memory 
"
+              "pressure for a long time and there is a significant degradation 
in "
+              "performance.");
+TAG_FLAG(run_non_memory_ops_prob, experimental);
+TAG_FLAG(run_non_memory_ops_prob, runtime);
+
 DEFINE_double(maintenance_op_multiplier, 1.1,
               "Multiplier applied on different priority levels, table 
maintenance OPs on level N "
               "has multiplier of FLAGS_maintenance_op_multiplier^N, the last 
score will be "
@@ -121,6 +133,20 @@ DEFINE_int32(maintenance_manager_inject_latency_ms, 0,
 TAG_FLAG(maintenance_manager_inject_latency_ms, runtime);
 TAG_FLAG(maintenance_manager_inject_latency_ms, unsafe);
 
+DECLARE_int32(memory_pressure_percentage);
+DECLARE_int32(memory_limit_soft_percentage);
+
+static bool ValidateProbability(const char* flagname, double value) {
+  if (value >= 0.0 && value <= 1.0) {
+    return true;
+  }
+  LOG(ERROR) << Substitute("$0 must be a probability from 0 to 1,"
+                           " value $1 is invalid", flagname, value);
+  return false;
+}
+DEFINE_validator(run_non_memory_ops_prob, &ValidateProbability);
+DEFINE_validator(data_gc_prioritization_prob, &ValidateProbability);
+
 namespace kudu {
 
 MaintenanceOpStats::MaintenanceOpStats() {
@@ -193,7 +219,9 @@ MaintenanceManager::MaintenanceManager(
                          : FLAGS_maintenance_manager_history_size),
       completed_ops_count_(0),
       rand_(GetRandomSeed32()),
-      memory_pressure_func_(&process_memory::UnderMemoryPressure),
+      memory_pressure_func_([&](double* consumption) {
+        return this->ProceedWithFlush(consumption);
+      }),
       metrics_(CHECK_NOTNULL(metric_entity)) {
   CHECK_OK(ThreadPoolBuilder("MaintenanceMgr")
                .set_min_threads(num_threads_)
@@ -504,6 +532,9 @@ pair<MaintenanceOp*, string> 
MaintenanceManager::FindBestOp() {
   // are anchoring WALs. Choosing the op that frees the most WALs ensures that
   // all ops that anchor memory (and also anchor WALs) will eventually be
   // performed.
+  //
+  // Do not always flush MRS/DMS even under memory pressure, some perf 
improvement
+  // ops might be more important than freeing memory even if under memory 
pressure.
   double capacity_pct;
   if (memory_pressure_func_(&capacity_pct) && 
most_logs_retained_bytes_ram_anchored_op) {
     DCHECK_GT(most_logs_retained_bytes_ram_anchored, 0);
@@ -693,4 +724,14 @@ void 
MaintenanceManager::DecreaseOpCountAndNotifyWaiters(MaintenanceOp* op) {
   op->cond_->Signal();
 }
 
+bool MaintenanceManager::ProceedWithFlush(double* used_memory_percentage) {
+  if (process_memory::UnderMemoryPressure(used_memory_percentage)) {
+    double pressure_threshold = 
static_cast<double>(FLAGS_memory_pressure_percentage) / 100;
+    double soft_limit = 
static_cast<double>(FLAGS_memory_limit_soft_percentage) / 100;
+    return pressure_threshold >= soft_limit || *used_memory_percentage >= 
soft_limit ||
+        rand_.NextDoubleFraction() >= FLAGS_run_non_memory_ops_prob *
+        (soft_limit - *used_memory_percentage) / (soft_limit - 
pressure_threshold);
+  }
+  return false;
+}
 } // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.h 
b/src/kudu/util/maintenance_manager.h
index 2e1f48b9d..6b95d1a18 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -376,6 +376,15 @@ class MaintenanceManager : public 
std::enable_shared_from_this<MaintenanceManage
   // 'lock_' is held.
   void MergePendingOpRegistrationsUnlocked();
 
+  /// Determine whether to run flush ops, depends on memory pressure and
+  /// the flag run_non_memory_ops_prob.
+  ///
+  /// @param [out] used_memory_percentage
+  ///    The memory usage for now.
+  ///
+  /// @return Should the maintenance manager find a flush operation to run.
+  bool ProceedWithFlush(double* used_memory_percentage);
+
   const std::string server_uuid_;
   const int32_t num_threads_;
   const MonoDelta polling_interval_;

Reply via email to