This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2d9ec456a7a [improvement](cloud) Add enable_recycler config to skip
recycler dynamically (#63286)
2d9ec456a7a is described below
commit 2d9ec456a7ad2e3a4877fef47ed51f04c4bb0f1a
Author: meiyi <[email protected]>
AuthorDate: Tue May 19 16:19:01 2026 +0800
[improvement](cloud) Add enable_recycler config to skip recycler
dynamically (#63286)
---
cloud/src/common/config.h | 2 +
cloud/src/recycler/recycler.cpp | 45 ++++++++++++++---------
cloud/test/recycler_test.cpp | 81 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 110 insertions(+), 18 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index f54f65f9202..cdc8b5cd190 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -443,4 +443,6 @@ CONF_Bool(enable_instance_update_watcher, "true");
CONF_mBool(advance_txn_lazy_commit_during_reads, "true");
CONF_mBool(wait_txn_lazy_commit_during_reads, "true");
+// Whether to enable recycler. If false, the recycler will skip scanning
instances to pending queue.
+CONF_mBool(enable_recycler, "true");
} // namespace doris::cloud::config
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 401d938a1ae..84e92fa04c1 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -249,26 +249,30 @@ void Recycler::instance_scanner_callback() {
std::this_thread::sleep_for(
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
while (!stopped()) {
- std::vector<InstanceInfoPB> instances;
- get_all_instances(txn_kv_.get(), instances);
- // TODO(plat1ko): delete job recycle kv of non-existent instances
- LOG(INFO) << "Recycler get instances: " << [&instances] {
- std::stringstream ss;
- for (auto& i : instances) ss << ' ' << i.instance_id();
- return ss.str();
- }();
- if (!instances.empty()) {
- // enqueue instances
- std::lock_guard lock(mtx_);
- for (auto& instance : instances) {
- if (instance_filter_.filter_out(instance.instance_id()))
continue;
- auto [_, success] =
pending_instance_set_.insert(instance.instance_id());
- // skip instance already in pending queue
- if (success) {
- pending_instance_queue_.push_back(std::move(instance));
+ if (config::enable_recycler) {
+ std::vector<InstanceInfoPB> instances;
+ get_all_instances(txn_kv_.get(), instances);
+ // TODO(plat1ko): delete job recycle kv of non-existent instances
+ LOG(INFO) << "Recycler get instances: " << [&instances] {
+ std::stringstream ss;
+ for (auto& i : instances) ss << ' ' << i.instance_id();
+ return ss.str();
+ }();
+ if (!instances.empty()) {
+ // enqueue instances
+ std::lock_guard lock(mtx_);
+ for (auto& instance : instances) {
+ if (instance_filter_.filter_out(instance.instance_id()))
continue;
+ auto [_, success] =
pending_instance_set_.insert(instance.instance_id());
+ // skip instance already in pending queue
+ if (success) {
+ pending_instance_queue_.push_back(std::move(instance));
+ }
}
+ pending_instance_cond_.notify_all();
}
- pending_instance_cond_.notify_all();
+ } else {
+ LOG(WARNING) << "Skip recycler since enable_recycler is false";
}
{
std::unique_lock lock(mtx_);
@@ -298,6 +302,11 @@ void Recycler::recycle_callback() {
// skip instance in recycling
if (recycling_instance_map_.count(instance_id)) continue;
}
+ if (!config::enable_recycler) {
+ LOG(WARNING) << "Skip recycle instance_id=" << instance_id
+ << " since enable_recycler is false";
+ continue;
+ }
auto instance_recycler = std::make_shared<InstanceRecycler>(
txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index ffe2401862b..022a83eb88f 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -8693,4 +8693,85 @@ TEST(RecyclerTest,
recycle_tablet_with_delete_file_failure) {
EXPECT_EQ(it->size(), 0) << "All recycle rowset keys should be
deleted";
}
}
+
+TEST(RecyclerTest, enable_recycler_default_true) {
+ EXPECT_TRUE(config::enable_recycler);
+}
+
+TEST(RecyclerTest, enable_recycler_skip_instance_scanner) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ bool old_val = config::enable_recycler;
+ config::enable_recycler = false;
+ DORIS_CLOUD_DEFER {
+ config::enable_recycler = old_val;
+ };
+
+ int64_t old_recycle_interval = config::recycle_interval_seconds;
+ config::recycle_interval_seconds = 0;
+ DORIS_CLOUD_DEFER {
+ config::recycle_interval_seconds = old_recycle_interval;
+ };
+
+ int64_t old_sleep = config::recycler_sleep_before_scheduling_seconds;
+ config::recycler_sleep_before_scheduling_seconds = 0;
+ DORIS_CLOUD_DEFER {
+ config::recycler_sleep_before_scheduling_seconds = old_sleep;
+ };
+
+ Recycler recycler(txn_kv);
+ std::thread t([&]() { recycler.instance_scanner_callback(); });
+
+ // Let the callback complete one iteration:
+ // sleep(0) -> check enable_recycler (false, skip) -> wait_for(0,
timeout)
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ recycler.stopped_ = true;
+ recycler.notifier_.notify_all();
+ t.join();
+
+ EXPECT_TRUE(recycler.pending_instance_queue_.empty());
+}
+
+TEST(RecyclerTest, enable_recycler_skip_recycle_callback) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ bool old_val = config::enable_recycler;
+ config::enable_recycler = false;
+ DORIS_CLOUD_DEFER {
+ config::enable_recycler = old_val;
+ };
+
+ Recycler recycler(txn_kv);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id("test_instance");
+ recycler.pending_instance_queue_.push_back(instance);
+ recycler.pending_instance_set_.insert("test_instance");
+
+ std::thread t([&]() { recycler.recycle_callback(); });
+
+ // Wait until the callback has popped the instance from the queue.
+ // Can not wait on pending_instance_cond_ here because the callback does
+ // not notify after popping, which may cause a deadlock: both the main
+ // thread and the callback end up waiting on the same CV with different
+ // predicates and no one will wake them up.
+ while (true) {
+ {
+ std::lock_guard lock(recycler.mtx_);
+ if (recycler.pending_instance_queue_.empty()) break;
+ }
+ std::this_thread::yield();
+ }
+
+ recycler.stopped_ = true;
+ recycler.pending_instance_cond_.notify_all();
+ t.join();
+
+ EXPECT_TRUE(recycler.pending_instance_queue_.empty());
+ EXPECT_TRUE(recycler.pending_instance_set_.empty());
+ EXPECT_TRUE(recycler.recycling_instance_map_.empty());
+}
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]