This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 66a79dddf KUDU-3407 Avoid unchecked scheduling of flush operations. 66a79dddf is described below commit 66a79dddf2faf31dbb49cd8e08c0e322c4bb57ce Author: 宋家成 <songjiach...@thinkingdata.cn> AuthorDate: Fri Jul 7 14:36:29 2023 +0800 KUDU-3407 Avoid unchecked scheduling of flush operations. In some clusters, the memory usages of tservers might be 60% ~ 80% for a long time. During this time the maintenance manager will not run any operation other than wal gc and MRS/DRS flushes, which will make the performance of tservers worse and worse and eventually break due to OOM. This patch add an argument to give a chance to do other operations while server is under memory pressure. This mechanism works when the memory usage is between memory_pressure_percentage and memory_limit_soft_percentage. Higher the memory usage is, higher the probability to flush MRS/DMS. e.g. memory_pressure_percentage = 60% memory_limit_soft_percentage = 80% The probability of not flushing MRS/DMS is the value of run_non_memory_ops_prob. As the memory increases, it gradually decreases to 0, when thememory usage is 80%. Change-Id: Idc2fd3a850cf99d54ef2980211b712468440ed80 Reviewed-on: http://gerrit.cloudera.org:8080/20166 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/util/maintenance_manager-test.cc | 158 ++++++++++++++++++++++++++++-- src/kudu/util/maintenance_manager.cc | 43 +++++++- src/kudu/util/maintenance_manager.h | 9 ++ 3 files changed, 200 insertions(+), 10 deletions(-) diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc index de78f9608..177fca68f 100644 --- a/src/kudu/util/maintenance_manager-test.cc +++ b/src/kudu/util/maintenance_manager-test.cc @@ -29,6 +29,7 @@ #include <ostream> #include <string> #include <thread> +#include <type_traits> #include <utility> #include <vector> @@ -44,9 +45,12 @@ #include "kudu/util/maintenance_manager_metrics.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" +#include "kudu/util/random.h" +#include "kudu/util/random_util.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" +#include "kudu/util/thread.h" using std::list; using std::shared_ptr; @@ -72,6 +76,10 @@ DECLARE_bool(enable_maintenance_manager); DECLARE_int64(log_target_replay_size_mb); DECLARE_double(maintenance_op_multiplier); DECLARE_int32(max_priority_range); +DECLARE_double(run_non_memory_ops_prob); +DECLARE_double(data_gc_prioritization_prob); +DECLARE_int32(memory_pressure_percentage); +DECLARE_int32(memory_limit_soft_percentage); namespace kudu { // Set this a bit bigger so that the manager could keep track of all possible completed ops. @@ -89,6 +97,7 @@ class TestMaintenanceOp : public MaintenanceOp { start_stats_latch_(start_stats_latch), continue_stats_latch_(continue_stats_latch), ram_anchored_(500), + data_retained_bytes_(0), logs_retained_bytes_(0), perf_improvement_(0), metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")), @@ -100,12 +109,18 @@ class TestMaintenanceOp : public MaintenanceOp { update_stats_time_(MonoDelta::FromSeconds(0)), priority_(priority), workload_score_(0), - update_stats_count_(0) { + update_stats_count_(0), + update_time_(MonoTime::Now()), + queue_time_(MonoDelta::FromSeconds(0)), + run_count_(0), + updated_(false), + register_self_(false) { } ~TestMaintenanceOp() override = default; bool Prepare() override { + queue_time_ += (MonoTime::Now() - update_time_); std::lock_guard<simple_spinlock> guard(lock_); if (remaining_runs_ == 0) { return false; @@ -126,13 +141,20 @@ class TestMaintenanceOp : public MaintenanceOp { CHECK_GE(prepared_runs_, 1); prepared_runs_--; } - SleepFor(sleep_time_); - { std::lock_guard<simple_spinlock> guard(lock_); + run_count_++; + updated_ = false; completed_at_ = MonoTime::Now(); } + if (register_self_) { + scoped_refptr<kudu::Thread> thread; + // Re-register itself after 50ms. + kudu::Thread::Create("maintenance-test", "self-register", [this]() { + this->set_remaining_runs(1); + }, &thread); + } } void UpdateStats(MaintenanceOpStats* stats) override { @@ -153,9 +175,13 @@ class TestMaintenanceOp : public MaintenanceOp { stats->set_runnable(remaining_runs_ > 0); stats->set_ram_anchored(ram_anchored_); stats->set_logs_retained_bytes(logs_retained_bytes_); + stats->set_data_retained_bytes(data_retained_bytes_); stats->set_perf_improvement(perf_improvement_); stats->set_workload_score(workload_score_); - + if (remaining_runs_ > 0 && !updated_) { + update_time_ = MonoTime::Now(); + updated_ = true; + } ++update_stats_count_; } @@ -184,6 +210,11 @@ class TestMaintenanceOp : public MaintenanceOp { logs_retained_bytes_ = logs_retained_bytes; } + void set_data_retained_bytes(uint64_t data_retained_bytes) { + std::lock_guard<simple_spinlock> guard(lock_); + data_retained_bytes_ = data_retained_bytes; + } + void set_perf_improvement(uint64_t perf_improvement) { std::lock_guard<simple_spinlock> guard(lock_); perf_improvement_ = perf_improvement; @@ -194,6 +225,11 @@ class TestMaintenanceOp : public MaintenanceOp { workload_score_ = workload_score; } + void set_register_self(bool register_self) { + std::lock_guard<simple_spinlock> guard(lock_); + register_self_ = register_self; + } + scoped_refptr<Histogram> DurationHistogram() const override { return maintenance_op_duration_; } @@ -221,6 +257,14 @@ class TestMaintenanceOp : public MaintenanceOp { return completed_at_; } + int64_t run_count() const { + return run_count_; + } + + MonoDelta queue_time() const { + return queue_time_; + } + private: mutable simple_spinlock lock_; @@ -233,6 +277,7 @@ class TestMaintenanceOp : public MaintenanceOp { CountDownLatch* continue_stats_latch_; uint64_t ram_anchored_; + uint64_t data_retained_bytes_; uint64_t logs_retained_bytes_; uint64_t perf_improvement_; MetricRegistry metric_registry_; @@ -262,6 +307,18 @@ class TestMaintenanceOp : public MaintenanceOp { // Timestamp of the monotonous clock when the operation was completed. MonoTime completed_at_; + + // Timestamp of updating status in FindBestOp. + MonoTime update_time_; + // Sum of scheduled time, which indicates how long this op has been + // scheduled after it could be run. + MonoDelta queue_time_; + // How many times the operation has been run. + int64_t run_count_; + // If the operation had been updated but has not been performed. + bool updated_; + // Whether re-register itself after performing. + bool register_self_; }; class MaintenanceManagerTest : public KuduTest { @@ -285,9 +342,20 @@ class MaintenanceManagerTest : public KuduTest { options.polling_interval_ms = 1; options.history_size = kHistorySize; manager_.reset(new MaintenanceManager(options, kFakeUuid, metric_entity_)); + // Same logic with MaintenanceManager#ProceedWithFlush, but the memory usage is + // simulated. manager_->set_memory_pressure_func_for_tests( [&](double* /* consumption */) { - return indicate_memory_pressure_.load(); + double pressure_ratio = static_cast<double>(FLAGS_memory_pressure_percentage) / 100; + if (memory_pressure_pct_.load() >= pressure_ratio) { + double pressure_threshold = pressure_ratio; + double soft_limit = static_cast<double>(FLAGS_memory_limit_soft_percentage) / 100; + return pressure_threshold >= soft_limit || memory_pressure_pct_.load() >= + soft_limit ||Random(GetRandomSeed32()).NextDoubleFraction() >= + FLAGS_run_non_memory_ops_prob * (soft_limit - memory_pressure_pct_.load()) + / (soft_limit - pressure_threshold); + } + return false; }); ASSERT_OK(manager_->Start()); } @@ -317,7 +385,7 @@ class MaintenanceManagerTest : public KuduTest { scoped_refptr<MetricEntity> metric_entity_; shared_ptr<MaintenanceManager> manager_; - std::atomic<bool> indicate_memory_pressure_ { false }; + std::atomic<double> memory_pressure_pct_ { 0.0 }; }; // Just create the MaintenanceManager and then shut it down, to make sure @@ -412,7 +480,7 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressurePrioritizesMemory) { ASSERT_EQ(0, op.DurationHistogram()->TotalCount()); // Fake that the server is under memory pressure. - indicate_memory_pressure_ = true; + memory_pressure_pct_ = 0.7; ASSERT_EVENTUALLY([&]() { ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1); @@ -433,7 +501,7 @@ TEST_F(MaintenanceManagerTest, TestMemoryPressurePerformsNoMemoryOp) { // Now fake that the server is under memory pressure and make our op runnable // by giving it a perf score. - indicate_memory_pressure_ = true; + memory_pressure_pct_ = 0.7; op.set_perf_improvement(1); // Even though we're under memory pressure, and even though our op doesn't @@ -513,7 +581,7 @@ TEST_F(MaintenanceManagerTest, TestPrioritizeLogRetentionUnderMemoryPressure) { op3.set_logs_retained_bytes(99); op3.set_ram_anchored(101); - indicate_memory_pressure_ = true; + memory_pressure_pct_ = 0.7; manager_->RegisterOp(&op1); manager_->RegisterOp(&op2); manager_->RegisterOp(&op3); @@ -899,4 +967,76 @@ TEST_F(MaintenanceManagerTest, TestUnregisterWhileScheduling) { op1.Unregister(); } +// Show which operation the @MaintenanceManager will pick in various workloads and +// policies. This test would not assert anything since it tests the probability flags. +TEST_F(MaintenanceManagerTest, ComprehensiveTest) { + SKIP_IF_SLOW_NOT_ALLOWED(); + + // Select policies here. + memory_pressure_pct_ = 0.6; + FLAGS_run_non_memory_ops_prob = 0.2; + FLAGS_data_gc_prioritization_prob = 0.5; + + StopManager(); + + TestMaintenanceOp op1("perf_op", MaintenanceOp::HIGH_IO_USAGE); + op1.set_perf_improvement(10); + op1.set_remaining_runs(1); + op1.set_sleep_time(MonoDelta::FromMilliseconds(5)); + op1.set_register_self(true); + + TestMaintenanceOp op2("memory_op", MaintenanceOp::HIGH_IO_USAGE); + op2.set_ram_anchored(2000); + op2.set_remaining_runs(1); + op2.set_sleep_time(MonoDelta::FromMilliseconds(5)); + op2.set_register_self(true); + + TestMaintenanceOp op3("data_gc_op", MaintenanceOp::HIGH_IO_USAGE); + op3.set_data_retained_bytes(1024 * 1024); + op3.set_remaining_runs(1); + op3.set_sleep_time(MonoDelta::FromMilliseconds(5)); + op3.set_register_self(true); + + // Set maintenance manager num to 1 to get a clean vision of scheduling. + NO_FATALS(StartManager(1)); + FLAGS_enable_maintenance_manager = false; + manager_->RegisterOp(&op1); + manager_->RegisterOp(&op2); + manager_->RegisterOp(&op3); + FLAGS_enable_maintenance_manager = true; + // Wait for the memory_op to run over 1000 times and then check the running times + // of other operations. + AssertEventually([&]() { + ASSERT_LE(1000, op2.DurationHistogram()->TotalCount()); + }, MonoDelta::FromSeconds(60)); + op1.set_register_self(false); + op2.set_register_self(false); + op3.set_register_self(false); + // Wait until all the operations are done. + SleepFor(MonoDelta::FromMilliseconds(100)); + + manager_->UnregisterOp(&op1); + manager_->UnregisterOp(&op2); + manager_->UnregisterOp(&op3); + + // The expected running times of operations should be like: + // other_ops_running_times = memory_op_running_times * probability_not_to_flush + // Since the memory usage is 60%, probability_not_to_flush is exactly + // FLAGS_run_non_memory_ops_prob. And also, the applying and registering might + // take time, so the other_ops_running_times might be greater than expected. + const int64_t memory_op_running_times = op2.run_count(); + const int64_t other_ops_running_times = op1.run_count() + op3.run_count(); + ASSERT_LT(memory_op_running_times * 0.15, other_ops_running_times); + ASSERT_GT(memory_op_running_times * 0.35, other_ops_running_times); + + LOG(INFO) << Substitute("op1: $0 perform count: $1 average schedule time: $2", + op1.name(), op1.run_count(), op1.queue_time().ToMilliseconds() + / op1.run_count()); + LOG(INFO) << Substitute("op2: $0 perform count: $1 average schedule time: $2", + op2.name(), op2.run_count(), op2.queue_time().ToMilliseconds() + / op2.run_count()); + LOG(INFO) << Substitute("op3: $0 perform count: $1 average schedule time: $2", + op3.name(), op3.run_count(), op3.queue_time().ToMilliseconds() + / op3.run_count()); +} } // namespace kudu diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc index 56064e3e1..898fb0e3f 100644 --- a/src/kudu/util/maintenance_manager.cc +++ b/src/kudu/util/maintenance_manager.cc @@ -101,6 +101,18 @@ DEFINE_double(data_gc_prioritization_prob, 0.5, "such as delta compaction."); TAG_FLAG(data_gc_prioritization_prob, experimental); +DEFINE_double(run_non_memory_ops_prob, 0, + "The probability that the tablet server will not flush DRS or MRS " + "while under memory pressure. This is useful when the server is under " + "memory pressure for a long time and there are non-memory operations " + "waiting to be run. The higher value means higher probability to " + "do other ops instead of flushing ops. This might be needed to turn " + "on if system admin found that the tablet server is under memory " + "pressure for a long time and there is a significant degradation in " + "performance."); +TAG_FLAG(run_non_memory_ops_prob, experimental); +TAG_FLAG(run_non_memory_ops_prob, runtime); + DEFINE_double(maintenance_op_multiplier, 1.1, "Multiplier applied on different priority levels, table maintenance OPs on level N " "has multiplier of FLAGS_maintenance_op_multiplier^N, the last score will be " @@ -121,6 +133,20 @@ DEFINE_int32(maintenance_manager_inject_latency_ms, 0, TAG_FLAG(maintenance_manager_inject_latency_ms, runtime); TAG_FLAG(maintenance_manager_inject_latency_ms, unsafe); +DECLARE_int32(memory_pressure_percentage); +DECLARE_int32(memory_limit_soft_percentage); + +static bool ValidateProbability(const char* flagname, double value) { + if (value >= 0.0 && value <= 1.0) { + return true; + } + LOG(ERROR) << Substitute("$0 must be a probability from 0 to 1," + " value $1 is invalid", flagname, value); + return false; +} +DEFINE_validator(run_non_memory_ops_prob, &ValidateProbability); +DEFINE_validator(data_gc_prioritization_prob, &ValidateProbability); + namespace kudu { MaintenanceOpStats::MaintenanceOpStats() { @@ -193,7 +219,9 @@ MaintenanceManager::MaintenanceManager( : FLAGS_maintenance_manager_history_size), completed_ops_count_(0), rand_(GetRandomSeed32()), - memory_pressure_func_(&process_memory::UnderMemoryPressure), + memory_pressure_func_([&](double* consumption) { + return this->ProceedWithFlush(consumption); + }), metrics_(CHECK_NOTNULL(metric_entity)) { CHECK_OK(ThreadPoolBuilder("MaintenanceMgr") .set_min_threads(num_threads_) @@ -504,6 +532,9 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() { // are anchoring WALs. Choosing the op that frees the most WALs ensures that // all ops that anchor memory (and also anchor WALs) will eventually be // performed. + // + // Do not always flush MRS/DMS even under memory pressure, some perf improvement + // ops might be more important than freeing memory even if under memory pressure. double capacity_pct; if (memory_pressure_func_(&capacity_pct) && most_logs_retained_bytes_ram_anchored_op) { DCHECK_GT(most_logs_retained_bytes_ram_anchored, 0); @@ -693,4 +724,14 @@ void MaintenanceManager::DecreaseOpCountAndNotifyWaiters(MaintenanceOp* op) { op->cond_->Signal(); } +bool MaintenanceManager::ProceedWithFlush(double* used_memory_percentage) { + if (process_memory::UnderMemoryPressure(used_memory_percentage)) { + double pressure_threshold = static_cast<double>(FLAGS_memory_pressure_percentage) / 100; + double soft_limit = static_cast<double>(FLAGS_memory_limit_soft_percentage) / 100; + return pressure_threshold >= soft_limit || *used_memory_percentage >= soft_limit || + rand_.NextDoubleFraction() >= FLAGS_run_non_memory_ops_prob * + (soft_limit - *used_memory_percentage) / (soft_limit - pressure_threshold); + } + return false; +} } // namespace kudu diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h index 2e1f48b9d..6b95d1a18 100644 --- a/src/kudu/util/maintenance_manager.h +++ b/src/kudu/util/maintenance_manager.h @@ -376,6 +376,15 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage // 'lock_' is held. void MergePendingOpRegistrationsUnlocked(); + /// Determine whether to run flush ops, depends on memory pressure and + /// the flag run_non_memory_ops_prob. + /// + /// @param [out] used_memory_percentage + /// The memory usage for now. + /// + /// @return Should the maintenance manager find a flush operation to run. + bool ProceedWithFlush(double* used_memory_percentage); + const std::string server_uuid_; const int32_t num_threads_; const MonoDelta polling_interval_;