This is an automated email from the ASF dual-hosted git repository.

freemandealer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 54a0c9f46e2 [improvement](filecache) limit file cache LRU replay 
queues (#64381)
54a0c9f46e2 is described below

commit 54a0c9f46e2305bafa3f1737ed0866e92322f9b7
Author: zhengyu <[email protected]>
AuthorDate: Thu Jun 11 09:18:32 2026 +0800

    [improvement](filecache) limit file cache LRU replay queues (#64381)
    
    Problem Summary: File cache LRU log replay needs tighter default replay
    latency, bounded in-memory queues, and observability for pending block
    LRU updates and LRU log replay. This change lowers the replay interval
    default to 1 ms, adds hard caps for the pending block update queue and
    per-type LRU log queues, preserves existing LRU log backlog when tail
    recording is disabled, and exposes queue length plus monotonic
    produce/consume/idle counters. QPS bvars are not added because
    Prometheus can derive rates from the counters. Tests that depended on
    background replay timing now use a deterministic single-replay helper
    with a high background interval.
---
 be/src/common/config.cpp                           |   4 +-
 be/src/common/config.h                             |   2 +
 be/src/io/cache/block_file_cache.cpp               | 111 +++++++++++++++++----
 be/src/io/cache/block_file_cache.h                 |  11 +-
 be/src/io/cache/lru_queue_recorder.cpp             |  68 ++++++++++++-
 be/src/io/cache/lru_queue_recorder.h               |  11 +-
 .../io/cache/block_file_cache_test_lru_dump.cpp    |  73 +++++++++++++-
 .../io/cache/block_file_cache_test_meta_store.cpp  |  28 +++++-
 be/test/io/cache/cache_lru_dumper_test.cpp         |  51 +++++++++-
 9 files changed, 324 insertions(+), 35 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0ad6ce6d28f..86b7d12e0aa 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1235,6 +1235,7 @@ DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
 DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
 DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
 DEFINE_mInt64(file_cache_background_block_lru_update_qps_limit, "1000");
+DEFINE_mInt64(file_cache_background_block_lru_update_queue_max_size, "500000");
 DEFINE_mBool(enable_file_cache_async_touch_on_get_or_set, "false");
 DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
 DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
@@ -1245,7 +1246,8 @@ DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, 
"60000");
 // dump queue only if the queue update specific times through several dump 
intervals
 DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000");
 DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
-DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
+DEFINE_mInt64(file_cache_background_lru_log_queue_max_size, "500000");
+DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1");
 DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");
 
 DEFINE_mBool(file_cache_enable_only_warm_up_idx, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c13e5494c6b..0b415ed5d2c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1280,6 +1280,7 @@ DECLARE_mInt64(file_cache_remove_block_qps_limit);
 DECLARE_mInt64(file_cache_background_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
 DECLARE_mInt64(file_cache_background_block_lru_update_qps_limit);
+DECLARE_mInt64(file_cache_background_block_lru_update_queue_max_size);
 DECLARE_mBool(enable_file_cache_async_touch_on_get_or_set);
 DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
 DECLARE_mInt64(file_cache_background_monitor_interval_ms);
@@ -1293,6 +1294,7 @@ 
DECLARE_mInt64(file_cache_background_lru_dump_interval_ms);
 // dump queue only if the queue update specific times through several dump 
intervals
 DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold);
 DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
+DECLARE_mInt64(file_cache_background_lru_log_queue_max_size);
 DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
 DECLARE_mBool(enable_evaluate_shadow_queue_diff);
 
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index dfbad8a63ed..5c4a275ac3d 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -42,6 +42,7 @@
 #include <ranges>
 
 #include "common/cast_set.h"
+#include "common/check.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "core/uint128.h"
@@ -59,24 +60,53 @@
 #include "util/time.h"
 namespace doris::io {
 
+namespace {
+
+constexpr std::array<FileCacheType, 4> LRU_LOG_REPLAY_TYPES = {
+        FileCacheType::TTL, FileCacheType::INDEX, FileCacheType::NORMAL, 
FileCacheType::DISPOSABLE};
+
+size_t file_cache_type_index(FileCacheType type) {
+    return static_cast<size_t>(type);
+}
+
+} // namespace
+
 // Insert a block pointer into one shard while swallowing allocation failures.
-bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
-    if (!block) {
+bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block, size_t max_queue_size) {
+    if (!block || max_queue_size == 0) {
         return false;
     }
+    bool reserved = false;
     try {
         auto* raw_ptr = block.get();
         auto idx = shard_index(raw_ptr);
         auto& shard = _shards[idx];
         std::lock_guard lock(shard.mutex);
-        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
-        if (inserted) {
-            _size.fetch_add(1, std::memory_order_relaxed);
+        if (shard.entries.contains(raw_ptr)) {
+            return false;
         }
-        return inserted;
+        size_t cur_size = _size.load(std::memory_order_relaxed);
+        while (cur_size < max_queue_size) {
+            if (_size.compare_exchange_weak(cur_size, cur_size + 1, 
std::memory_order_relaxed)) {
+                reserved = true;
+                break;
+            }
+        }
+        if (!reserved) {
+            return false;
+        }
+        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
+        DORIS_CHECK(inserted);
+        return true;
     } catch (const std::exception& e) {
+        if (reserved) {
+            decrease_size(1);
+        }
         LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
     } catch (...) {
+        if (reserved) {
+            decrease_size(1);
+        }
         LOG(WARNING) << "Failed to enqueue block for LRU update: unknown 
error";
     }
     return false;
@@ -103,7 +133,7 @@ size_t NeedUpdateLRUBlocks::drain(size_t limit, 
std::vector<FileBlockSPtr>* outp
                 ++shard_drained;
             }
             if (shard_drained > 0) {
-                _size.fetch_sub(shard_drained, std::memory_order_relaxed);
+                decrease_size(shard_drained);
                 drained += shard_drained;
             }
         }
@@ -123,7 +153,7 @@ void NeedUpdateLRUBlocks::clear() {
             if (!shard.entries.empty()) {
                 auto removed = shard.entries.size();
                 shard.entries.clear();
-                _size.fetch_sub(removed, std::memory_order_relaxed);
+                decrease_size(removed);
             }
         }
     } catch (const std::exception& e) {
@@ -133,6 +163,16 @@ void NeedUpdateLRUBlocks::clear() {
     }
 }
 
+void NeedUpdateLRUBlocks::decrease_size(size_t delta) {
+    size_t cur_size = _size.load(std::memory_order_relaxed);
+    while (true) {
+        DORIS_CHECK_GE(cur_size, delta);
+        if (_size.compare_exchange_weak(cur_size, cur_size - delta, 
std::memory_order_relaxed)) {
+            return;
+        }
+    }
+}
+
 size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
     DCHECK(ptr != nullptr);
     return std::hash<FileBlock*> {}(ptr)&kShardMask;
@@ -348,12 +388,30 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
             _cache_base_path.c_str(), "file_cache_recycle_keys_length");
     _need_update_lru_blocks_length_recorder = 
std::make_shared<bvar::LatencyRecorder>(
             _cache_base_path.c_str(), 
"file_cache_need_update_lru_blocks_length");
+    _need_update_lru_blocks_produce_metrics = 
std::make_shared<bvar::Adder<size_t>>(
+            _cache_base_path.c_str(), 
"file_cache_need_update_lru_blocks_produce");
+    _need_update_lru_blocks_consume_metrics = 
std::make_shared<bvar::Adder<size_t>>(
+            _cache_base_path.c_str(), 
"file_cache_need_update_lru_blocks_consume");
     _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
             _cache_base_path.c_str(), 
"file_cache_update_lru_blocks_latency_us");
     _ttl_gc_latency_us = 
std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
                                                                  
"file_cache_ttl_gc_latency_us");
     _shadow_queue_levenshtein_distance = 
std::make_shared<bvar::LatencyRecorder>(
             _cache_base_path.c_str(), 
"file_cache_shadow_queue_levenshtein_distance");
+    for (FileCacheType type : {FileCacheType::DISPOSABLE, 
FileCacheType::NORMAL,
+                               FileCacheType::INDEX, FileCacheType::TTL}) {
+        size_t idx = file_cache_type_index(type);
+        std::string metric_prefix =
+                "file_cache_lru_recorder_" + cache_type_to_string(type) + 
"_record_queue";
+        _lru_recorder_queue_length_recorder[idx] = 
std::make_shared<bvar::LatencyRecorder>(
+                _cache_base_path.c_str(), metric_prefix + "_length");
+        _lru_recorder_queue_produce_metrics[idx] = 
std::make_shared<bvar::Adder<size_t>>(
+                _cache_base_path.c_str(), metric_prefix + "_produce");
+        _lru_recorder_queue_consume_metrics[idx] = 
std::make_shared<bvar::Adder<size_t>>(
+                _cache_base_path.c_str(), metric_prefix + "_consume");
+    }
+    _lru_recorder_log_replay_idle_metrics = 
std::make_shared<bvar::Adder<size_t>>(
+            _cache_base_path.c_str(), 
"file_cache_lru_recorder_log_replay_idle");
 
     _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
                                  cache_settings.disposable_queue_elements, 60 
* 60);
@@ -648,7 +706,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& 
hash, const CacheConte
 }
 
 void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
-    if (_need_update_lru_blocks.insert(std::move(block))) {
+    int64_t queue_limit = 
config::file_cache_background_block_lru_update_queue_max_size;
+    size_t max_queue_size = queue_limit <= 0 ? 0 : 
static_cast<size_t>(queue_limit);
+    if (_need_update_lru_blocks.insert(std::move(block), max_queue_size)) {
+        *_need_update_lru_blocks_produce_metrics << 1;
         *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size();
     }
 }
@@ -2161,6 +2222,7 @@ void BlockFileCache::run_background_block_lru_update() {
             *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size();
             continue;
         }
+        *_need_update_lru_blocks_consume_metrics << drained;
 
         int64_t duration_ns = 0;
         {
@@ -2376,19 +2438,28 @@ void BlockFileCache::run_background_lru_log_replay() {
             }
         }
 
-        _lru_recorder->replay_queue_event(FileCacheType::TTL);
-        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
-        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
-        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
+        replay_lru_logs_once();
+    }
+}
+
+size_t BlockFileCache::replay_lru_logs_once() {
+    size_t replayed = 0;
+    for (FileCacheType type : LRU_LOG_REPLAY_TYPES) {
+        replayed += _lru_recorder->replay_queue_event(type);
+    }
 
-        if (config::enable_evaluate_shadow_queue_diff) {
-            SCOPED_CACHE_LOCK(_mutex, this);
-            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
-            _lru_recorder->evaluate_queue_diff(_index_queue, "index", 
cache_lock);
-            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", 
cache_lock);
-            _lru_recorder->evaluate_queue_diff(_disposable_queue, 
"disposable", cache_lock);
-        }
+    if (replayed == 0) {
+        *_lru_recorder_log_replay_idle_metrics << 1;
+    }
+
+    if (config::enable_evaluate_shadow_queue_diff) {
+        SCOPED_CACHE_LOCK(_mutex, this);
+        _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
+        _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
+        _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", 
cache_lock);
+        _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", 
cache_lock);
     }
+    return replayed;
 }
 
 void BlockFileCache::dump_lru_queues(bool force) {
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index 5bdbd09f914..5c7faea9ac4 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -25,6 +25,7 @@
 #include <atomic>
 #include <boost/lockfree/spsc_queue.hpp>
 #include <functional>
+#include <limits>
 #include <memory>
 #include <mutex>
 #include <optional>
@@ -91,7 +92,7 @@ public:
 
     // Insert a block into the pending set. Returns true only when the block
     // was not already queued. Null inputs are ignored.
-    bool insert(FileBlockSPtr block);
+    bool insert(FileBlockSPtr block, size_t max_queue_size = 
std::numeric_limits<size_t>::max());
 
     // Drain up to `limit` unique blocks into `output`. The method returns how
     // many blocks were actually drained and shrinks the internal size
@@ -114,6 +115,7 @@ private:
     };
 
     size_t shard_index(FileBlock* ptr) const;
+    void decrease_size(size_t delta);
 
     std::array<Shard, kShardCount> _shards;
     std::atomic<size_t> _size {0};
@@ -469,6 +471,7 @@ private:
     void run_background_monitor();
     void run_background_gc();
     void run_background_lru_log_replay();
+    size_t replay_lru_logs_once();
     void run_background_lru_dump();
     void restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock);
     void run_background_evict_in_advance();
@@ -616,9 +619,15 @@ private:
     std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
     std::shared_ptr<bvar::LatencyRecorder> _update_lru_blocks_latency_us;
     std::shared_ptr<bvar::LatencyRecorder> 
_need_update_lru_blocks_length_recorder;
+    std::shared_ptr<bvar::Adder<size_t>> 
_need_update_lru_blocks_produce_metrics;
+    std::shared_ptr<bvar::Adder<size_t>> 
_need_update_lru_blocks_consume_metrics;
     std::shared_ptr<bvar::LatencyRecorder> _ttl_gc_latency_us;
 
     std::shared_ptr<bvar::LatencyRecorder> _shadow_queue_levenshtein_distance;
+    std::array<std::shared_ptr<bvar::LatencyRecorder>, 4> 
_lru_recorder_queue_length_recorder;
+    std::array<std::shared_ptr<bvar::Adder<size_t>>, 4> 
_lru_recorder_queue_produce_metrics;
+    std::array<std::shared_ptr<bvar::Adder<size_t>>, 4> 
_lru_recorder_queue_consume_metrics;
+    std::shared_ptr<bvar::Adder<size_t>> _lru_recorder_log_replay_idle_metrics;
     // keep _storage last so it will deconstruct first
     // otherwise, load_cache_info_into_memory might crash
     // coz it will use other members of BlockFileCache
diff --git a/be/src/io/cache/lru_queue_recorder.cpp 
b/be/src/io/cache/lru_queue_recorder.cpp
index 9907e58cb2a..e2a3a8fa506 100644
--- a/be/src/io/cache/lru_queue_recorder.cpp
+++ b/be/src/io/cache/lru_queue_recorder.cpp
@@ -17,27 +17,53 @@
 
 #include "io/cache/lru_queue_recorder.h"
 
+#include "common/check.h"
+#include "common/config.h"
 #include "io/cache/block_file_cache.h"
 #include "io/cache/file_cache_common.h"
 
 namespace doris::io {
 
+namespace {
+
+size_t file_cache_type_index(FileCacheType type) {
+    return static_cast<size_t>(type);
+}
+
+} // namespace
+
 void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType 
log_type,
                                           const UInt128Wrapper hash, const 
size_t offset,
                                           const size_t size) {
-    CacheLRULogQueue& log_queue = get_lru_log_queue(type);
-    log_queue.enqueue(std::make_unique<CacheLRULog>(log_type, hash, offset, 
size));
+    if (config::file_cache_background_lru_dump_tail_record_num <= 0) {
+        return;
+    }
     ++(_lru_queue_update_cnt_from_last_dump[type]);
+    auto log = std::make_unique<CacheLRULog>(log_type, hash, offset, size);
+    if (!reserve_lru_log_queue_slot(type)) {
+        return;
+    }
+    CacheLRULogQueue& log_queue = get_lru_log_queue(type);
+    if (!log_queue.enqueue(std::move(log))) {
+        release_lru_log_queue_slot(type);
+        return;
+    }
+    size_t idx = file_cache_type_index(type);
+    *(_mgr->_lru_recorder_queue_produce_metrics[idx]) << 1;
+    *(_mgr->_lru_recorder_queue_length_recorder[idx]) << 
lru_log_queue_size(type);
 }
 
-void LRUQueueRecorder::replay_queue_event(FileCacheType type) {
+size_t LRUQueueRecorder::replay_queue_event(FileCacheType type) {
     // we don't need the real cache lock for the shadow queue, but we do need 
a lock to prevent read/write contension
     CacheLRULogQueue& log_queue = get_lru_log_queue(type);
     LRUQueue& shadow_queue = get_shadow_queue(type);
 
     std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
     std::unique_ptr<CacheLRULog> log;
+    size_t replayed = 0;
     while (log_queue.try_dequeue(log)) {
+        release_lru_log_queue_slot(type);
+        ++replayed;
         try {
             switch (log->type) {
             case CacheLRULogType::ADD: {
@@ -79,6 +105,12 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType 
type) {
             LOG(WARNING) << "Failed to replay queue event: " << e.what();
         }
     }
+    size_t idx = file_cache_type_index(type);
+    if (replayed > 0) {
+        *(_mgr->_lru_recorder_queue_consume_metrics[idx]) << replayed;
+    }
+    *(_mgr->_lru_recorder_queue_length_recorder[idx]) << 
lru_log_queue_size(type);
+    return replayed;
 }
 
 // we evaluate the diff between two queue by calculate how many operation is
@@ -137,4 +169,34 @@ void 
LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump(FileCacheType t
     _lru_queue_update_cnt_from_last_dump[type] = 0;
 }
 
+size_t LRUQueueRecorder::lru_log_queue_size(FileCacheType type) const {
+    return 
_lru_log_queue_size[file_cache_type_index(type)].load(std::memory_order_relaxed);
+}
+
+bool LRUQueueRecorder::reserve_lru_log_queue_slot(FileCacheType type) {
+    int64_t queue_limit = config::file_cache_background_lru_log_queue_max_size;
+    if (queue_limit <= 0) {
+        return false;
+    }
+    auto& queue_size = _lru_log_queue_size[file_cache_type_index(type)];
+    size_t cur_size = queue_size.load(std::memory_order_relaxed);
+    while (cur_size < static_cast<size_t>(queue_limit)) {
+        if (queue_size.compare_exchange_weak(cur_size, cur_size + 1, 
std::memory_order_relaxed)) {
+            return true;
+        }
+    }
+    return false;
+}
+
+void LRUQueueRecorder::release_lru_log_queue_slot(FileCacheType type) {
+    auto& queue_size = _lru_log_queue_size[file_cache_type_index(type)];
+    size_t cur_size = queue_size.load(std::memory_order_relaxed);
+    while (true) {
+        DORIS_CHECK_GT(cur_size, 0);
+        if (queue_size.compare_exchange_weak(cur_size, cur_size - 1, 
std::memory_order_relaxed)) {
+            return;
+        }
+    }
+}
+
 } // end of namespace doris::io
diff --git a/be/src/io/cache/lru_queue_recorder.h 
b/be/src/io/cache/lru_queue_recorder.h
index 5bd68b70d55..5ffa777e437 100644
--- a/be/src/io/cache/lru_queue_recorder.h
+++ b/be/src/io/cache/lru_queue_recorder.h
@@ -19,7 +19,11 @@
 
 #include <concurrentqueue.h>
 
+#include <array>
+#include <atomic>
 #include <boost/lockfree/spsc_queue.hpp>
+#include <mutex>
+#include <unordered_map>
 
 #include "io/cache/file_cache_common.h"
 
@@ -57,11 +61,12 @@ public:
     }
     void record_queue_event(FileCacheType type, CacheLRULogType log_type, 
const UInt128Wrapper hash,
                             const size_t offset, const size_t size);
-    void replay_queue_event(FileCacheType type);
+    size_t replay_queue_event(FileCacheType type);
     void evaluate_queue_diff(LRUQueue& base, std::string name,
                              std::lock_guard<std::mutex>& cache_lock);
     size_t get_lru_queue_update_cnt_from_last_dump(FileCacheType type);
     void reset_lru_queue_update_cnt_from_last_dump(FileCacheType type);
+    size_t lru_log_queue_size(FileCacheType type) const;
 
     CacheLRULogQueue& get_lru_log_queue(FileCacheType type);
     LRUQueue& get_shadow_queue(FileCacheType type);
@@ -81,8 +86,12 @@ private:
     CacheLRULogQueue _disposable_lru_log_queue;
 
     std::unordered_map<FileCacheType, size_t> 
_lru_queue_update_cnt_from_last_dump;
+    std::array<std::atomic<size_t>, 4> _lru_log_queue_size {};
 
     BlockFileCache* _mgr;
+
+    bool reserve_lru_log_queue_slot(FileCacheType type);
+    void release_lru_log_queue_slot(FileCacheType type);
 };
 
 } // namespace doris::io
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp 
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index 6d6681f4fcd..3a581d77c76 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -27,6 +27,11 @@ TEST_F(BlockFileCacheTest, 
test_lru_log_record_replay_dump_restore) {
     config::file_cache_enter_disk_resource_limit_mode_percent = 99;
     config::file_cache_background_lru_dump_interval_ms = 3000;
     config::file_cache_background_lru_dump_update_cnt_threshold = 0;
+    const auto old_replay_interval_ms = 
config::file_cache_background_lru_log_replay_interval_ms;
+    Defer defer {[old_replay_interval_ms] {
+        config::file_cache_background_lru_log_replay_interval_ms = 
old_replay_interval_ms;
+    }};
+    config::file_cache_background_lru_log_replay_interval_ms = 60 * 60 * 1000;
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -162,8 +167,7 @@ TEST_F(BlockFileCacheTest, 
test_lru_log_record_replay_dump_restore) {
     ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5);
 
     // then check the log replay
-    std::this_thread::sleep_for(std::chrono::milliseconds(
-            2 * config::file_cache_background_lru_log_replay_interval_ms));
+    ASSERT_EQ(cache.replay_lru_logs_once(), 20);
     
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 5);
     
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(), 
5);
     
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 
5);
@@ -180,8 +184,7 @@ TEST_F(BlockFileCacheTest, 
test_lru_log_record_replay_dump_restore) {
     ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 0);
     ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 0);
 
-    std::this_thread::sleep_for(std::chrono::milliseconds(
-            2 * config::file_cache_background_lru_log_replay_interval_ms));
+    ASSERT_EQ(cache.replay_lru_logs_once(), 6);
     
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 0);
     
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(), 
5);
     
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 
5);
@@ -493,6 +496,68 @@ TEST_F(BlockFileCacheTest, 
test_lru_duplicate_queue_entry_restore) {
     }
 }
 
+TEST_F(BlockFileCacheTest, need_update_lru_blocks_hard_cap) {
+    std::string cache_base_path = caches_dir / 
"cache_need_update_lru_blocks_hard_cap" / "";
+    const auto old_update_interval_ms = 
config::file_cache_background_block_lru_update_interval_ms;
+    const auto old_update_queue_max_size =
+            config::file_cache_background_block_lru_update_queue_max_size;
+    Defer defer {[old_update_interval_ms, old_update_queue_max_size] {
+        config::file_cache_background_block_lru_update_interval_ms = 
old_update_interval_ms;
+        config::file_cache_background_block_lru_update_queue_max_size = 
old_update_queue_max_size;
+    }};
+
+    config::file_cache_background_block_lru_update_interval_ms = 60 * 60 * 
1000;
+    config::file_cache_background_block_lru_update_queue_max_size = 2;
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 5000000;
+    settings.query_queue_elements = 50000;
+    settings.index_queue_size = 5000000;
+    settings.index_queue_elements = 50000;
+    settings.disposable_queue_size = 5000000;
+    settings.disposable_queue_elements = 50000;
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.capacity = 20000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    wait_until_cache_ready(cache);
+
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.cache_type = io::FileCacheType::NORMAL;
+    auto key = io::BlockFileCache::hash("need_update_lru_blocks_hard_cap");
+
+    std::vector<io::FileBlockSPtr> blocks;
+    for (size_t offset = 0; offset < 300000; offset += 100000) {
+        auto holder = cache.get_or_set(key, offset, 100000, context);
+        auto holder_blocks = fromHolder(holder);
+        ASSERT_EQ(holder_blocks.size(), 1);
+        blocks.push_back(holder_blocks[0]);
+    }
+
+    cache.add_need_update_lru_block(blocks[0]);
+    cache.add_need_update_lru_block(blocks[0]);
+    EXPECT_EQ(cache.need_update_lru_blocks_size_unsafe(), 1);
+
+    cache.add_need_update_lru_block(blocks[1]);
+    cache.add_need_update_lru_block(blocks[2]);
+    EXPECT_EQ(cache.need_update_lru_blocks_size_unsafe(), 2);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) {
     std::string cache_base_path = caches_dir / "cache_direct_read_order_check" 
/ "";
     config::enable_read_cache_file_directly = true;
diff --git a/be/test/io/cache/block_file_cache_test_meta_store.cpp 
b/be/test/io/cache/block_file_cache_test_meta_store.cpp
index eca106ebd7a..e2d2b5a957d 100644
--- a/be/test/io/cache/block_file_cache_test_meta_store.cpp
+++ b/be/test/io/cache/block_file_cache_test_meta_store.cpp
@@ -23,6 +23,8 @@
 #pragma clang diagnostic ignored "-Wkeyword-macro"
 #endif
 
+#include "util/defer_op.h"
+
 #define private public
 #define protected public
 #include "io/cache/block_file_cache_test_common.h"
@@ -55,12 +57,30 @@ void verify_meta_key(CacheBlockMetaStore& meta_store, 
int64_t tablet_id,
 } // namespace
 
 TEST_F(BlockFileCacheTest, version3_add_remove_restart) {
+    const auto old_enable_evict = config::enable_evict_file_cache_in_advance;
+    const auto old_disk_limit_percent = 
config::file_cache_enter_disk_resource_limit_mode_percent;
+    const auto old_dump_interval_ms = 
config::file_cache_background_lru_dump_interval_ms;
+    const auto old_dump_update_cnt_threshold =
+            config::file_cache_background_lru_dump_update_cnt_threshold;
+    const auto old_dump_tail_record_num = 
config::file_cache_background_lru_dump_tail_record_num;
+    const auto old_replay_interval_ms = 
config::file_cache_background_lru_log_replay_interval_ms;
+    Defer defer {[old_enable_evict, old_disk_limit_percent, 
old_dump_interval_ms,
+                  old_dump_update_cnt_threshold, old_dump_tail_record_num, 
old_replay_interval_ms] {
+        config::enable_evict_file_cache_in_advance = old_enable_evict;
+        config::file_cache_enter_disk_resource_limit_mode_percent = 
old_disk_limit_percent;
+        config::file_cache_background_lru_dump_interval_ms = 
old_dump_interval_ms;
+        config::file_cache_background_lru_dump_update_cnt_threshold = 
old_dump_update_cnt_threshold;
+        config::file_cache_background_lru_dump_tail_record_num = 
old_dump_tail_record_num;
+        config::file_cache_background_lru_log_replay_interval_ms = 
old_replay_interval_ms;
+    }};
+
     config::enable_evict_file_cache_in_advance = false;
     config::file_cache_enter_disk_resource_limit_mode_percent = 99;
     config::file_cache_background_lru_dump_interval_ms = 3000;
     config::file_cache_background_lru_dump_update_cnt_threshold = 0;
     config::file_cache_background_lru_dump_tail_record_num =
             2; // only dump last 2, to check dump works with meta store
+    config::file_cache_background_lru_log_replay_interval_ms = 60 * 60 * 1000;
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -239,8 +259,7 @@ TEST_F(BlockFileCacheTest, version3_add_remove_restart) {
         
ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5);
 
         // then check the log replay
-        std::this_thread::sleep_for(std::chrono::milliseconds(
-                2 * config::file_cache_background_lru_log_replay_interval_ms));
+        ASSERT_EQ(cache.replay_lru_logs_once(), 20);
         
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 5);
         
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(), 
5);
         
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 
5);
@@ -251,12 +270,13 @@ TEST_F(BlockFileCacheTest, version3_add_remove_restart) {
             cache.remove_if_cached(key2); // remove all element from index 
queue
         }
 
-        std::this_thread::sleep_for(std::chrono::milliseconds(
-                2 * config::file_cache_background_lru_log_replay_interval_ms));
+        ASSERT_EQ(cache.replay_lru_logs_once(), 5);
         
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 5);
         
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(), 
0);
         
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 
5);
         
ASSERT_EQ(cache._lru_recorder->_shadow_disposable_queue.get_elements_num_unsafe(),
 5);
+        EXPECT_EQ(cache.replay_lru_logs_once(), 0);
+        EXPECT_EQ(cache._lru_recorder_log_replay_idle_metrics->get_value(), 1);
 
         // check the meta store to see the content
         {
diff --git a/be/test/io/cache/cache_lru_dumper_test.cpp 
b/be/test/io/cache/cache_lru_dumper_test.cpp
index 76647ba544f..e2fdfb6a7e7 100644
--- a/be/test/io/cache/cache_lru_dumper_test.cpp
+++ b/be/test/io/cache/cache_lru_dumper_test.cpp
@@ -19,11 +19,13 @@
 
 #include <filesystem>
 
+#include "common/config.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "io/cache/block_file_cache.h"
 #include "io/cache/file_block.h"
 #include "io/cache/file_cache_common.h"
+#include "util/defer_op.h"
 
 using ::testing::_;
 using ::testing::Return;
@@ -158,4 +160,51 @@ TEST_F(CacheLRUDumperTest, test_dump_and_restore_queue) {
     }
 }
 
-} // namespace doris::io
\ No newline at end of file
+TEST_F(CacheLRUDumperTest, 
test_lru_log_record_disabled_keeps_existing_backlog) {
+    const auto old_tail_record_num = 
config::file_cache_background_lru_dump_tail_record_num;
+    const auto old_queue_limit = 
config::file_cache_background_lru_log_queue_max_size;
+    Defer defer {[old_tail_record_num, old_queue_limit] {
+        config::file_cache_background_lru_dump_tail_record_num = 
old_tail_record_num;
+        config::file_cache_background_lru_log_queue_max_size = old_queue_limit;
+    }};
+
+    config::file_cache_background_lru_dump_tail_record_num = 2;
+    config::file_cache_background_lru_log_queue_max_size = 10;
+
+    UInt128Wrapper hash(123456789ULL);
+    recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, 
hash, 0, 4096);
+    ASSERT_EQ(recorder->lru_log_queue_size(FileCacheType::NORMAL), 1);
+
+    config::file_cache_background_lru_dump_tail_record_num = 0;
+    recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, 
hash, 4096, 4096);
+
+    EXPECT_EQ(recorder->lru_log_queue_size(FileCacheType::NORMAL), 1);
+    
EXPECT_EQ(recorder->get_lru_log_queue(FileCacheType::NORMAL).size_approx(), 1);
+    EXPECT_EQ(recorder->replay_queue_event(FileCacheType::NORMAL), 1);
+    
EXPECT_EQ(recorder->get_shadow_queue(FileCacheType::NORMAL).get_elements_num_unsafe(),
 1);
+}
+
+TEST_F(CacheLRUDumperTest, test_lru_log_record_queue_hard_cap) {
+    const auto old_tail_record_num = 
config::file_cache_background_lru_dump_tail_record_num;
+    const auto old_queue_limit = 
config::file_cache_background_lru_log_queue_max_size;
+    Defer defer {[old_tail_record_num, old_queue_limit] {
+        config::file_cache_background_lru_dump_tail_record_num = 
old_tail_record_num;
+        config::file_cache_background_lru_log_queue_max_size = old_queue_limit;
+    }};
+
+    config::file_cache_background_lru_dump_tail_record_num = 100;
+    config::file_cache_background_lru_log_queue_max_size = 2;
+
+    UInt128Wrapper hash(987654321ULL);
+    recorder->record_queue_event(FileCacheType::INDEX, CacheLRULogType::ADD, 
hash, 0, 4096);
+    recorder->record_queue_event(FileCacheType::INDEX, CacheLRULogType::ADD, 
hash, 4096, 4096);
+    recorder->record_queue_event(FileCacheType::INDEX, CacheLRULogType::ADD, 
hash, 8192, 4096);
+
+    EXPECT_EQ(recorder->lru_log_queue_size(FileCacheType::INDEX), 2);
+    EXPECT_EQ(recorder->get_lru_log_queue(FileCacheType::INDEX).size_approx(), 
2);
+    EXPECT_EQ(recorder->replay_queue_event(FileCacheType::INDEX), 2);
+    EXPECT_EQ(recorder->lru_log_queue_size(FileCacheType::INDEX), 0);
+    
EXPECT_EQ(recorder->get_shadow_queue(FileCacheType::INDEX).get_elements_num_unsafe(),
 2);
+}
+
+} // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to