This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 1741dc695a1 [fix](filecache) self-heal stale DOWNLOADED entries on
local NOT_FOUND (#60977) (#61205)
1741dc695a1 is described below
commit 1741dc695a10d3705a1a2f439ab4552d977f1f9c
Author: zhengyu <[email protected]>
AuthorDate: Sat Mar 14 00:12:58 2026 +0800
[fix](filecache) self-heal stale DOWNLOADED entries on local NOT_FOUND
(#60977) (#61205)
Problem:
In a rare restart window, BE can rebuild file-cache metadata in memory
while
the corresponding cache files are not yet durable on disk. If that
metadata is
also restored via LRU dump/load, blocks may appear as DOWNLOADED even
though
the local files are missing. Subsequent reads then produce
false-positive cache
hits, fail on local read, and repeatedly fall back to S3. This preserves
correctness but causes avoidable cache thrashing and latency jitter.
Root cause:
The read path treated DOWNLOADED as a valid local hit source and fell
back to
remote reads on failure, but it did not actively invalidate stale
metadata when
the local cache file was gone.
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---------
Signed-off-by: zhengyu <[email protected]>
---
be/src/io/cache/cached_remote_file_reader.cpp | 15 +++
be/test/io/cache/block_file_cache_test.cpp | 185 ++++++++++++++++++++++++++
2 files changed, 200 insertions(+)
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index bcddd63e7be..c2c51ac4518 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -79,6 +79,8 @@ bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes(
bvar::Adder<uint64_t>
g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes");
bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes(
"cached_remote_reader_cache_indirect_total_bytes");
+bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found(
+ "cached_remote_reader_self_heal_on_not_found");
bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window(
"cached_remote_reader_indirect_bytes_1min_window",
&g_read_cache_indirect_bytes, 60);
bvar::Window<bvar::Adder<uint64_t>>
g_read_cache_indirect_total_bytes_1min_window(
@@ -473,6 +475,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
size_t current_offset = offset;
size_t end_offset = offset + bytes_req - 1;
+ bool need_self_heal = false;
*bytes_read = 0;
for (auto& block : holder.file_blocks) {
if (current_offset > end_offset) {
@@ -527,6 +530,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
}
}
if (!st || block_state != FileBlock::State::DOWNLOADED) {
+ if (block_state == FileBlock::State::DOWNLOADED &&
st.is<ErrorCode::NOT_FOUND>()) {
+ need_self_heal = true;
+ g_read_cache_self_heal_on_not_found << 1;
+ LOG_EVERY_N(WARNING, 100)
+ << "Cache block file is missing, will self-heal by
clearing cache "
+ "hash. "
+ << "path=" << path().native() << ", hash=" <<
_cache_hash.to_string()
+ << ", offset=" << left << ", err=" << st.msg();
+ }
LOG(WARNING) << "Read data failed from file cache downloaded
by others. err="
<< st.msg() << ", block state=" << block_state;
size_t bytes_read {0};
@@ -544,6 +556,9 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
*bytes_read += read_size;
current_offset = right + 1;
}
+ if (need_self_heal && _cache != nullptr) {
+ _cache->remove_if_cached_async(_cache_hash);
+ }
g_read_cache_indirect_bytes << indirect_read_bytes;
g_read_cache_indirect_total_bytes << *bytes_read;
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index a34b29b0050..333619a0bf8 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3943,6 +3943,191 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
FileCacheFactory::instance()->_capacity = 0;
}
+extern bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found;
+
+TEST_F(BlockFileCacheTest,
cached_remote_file_reader_self_heal_on_downloaded_not_found) {
+ bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+ config::enable_read_cache_file_directly = false;
+ Defer reset_direct_read {
+ [&] { config::enable_read_cache_file_directly =
origin_enable_direct_read; }};
+
+ std::string cache_base_path =
+ caches_dir /
"cached_remote_reader_self_heal_on_downloaded_not_found" / "";
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+ for (int i = 0; i < 100; i++) {
+ if (cache->get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ CachedRemoteFileReader reader(local_reader, opts);
+
+ uint64_t before_self_heal =
g_read_cache_self_heal_on_not_found.get_value();
+
+ std::string buffer(64_kb, '\0');
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read = 0;
+ ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+ auto key = io::BlockFileCache::hash("tmp_file");
+ {
+ io::CacheContext inspect_ctx;
+ ReadStatistics inspect_stats;
+ inspect_ctx.stats = &inspect_stats;
+ inspect_ctx.cache_type = io::FileCacheType::NORMAL;
+ auto inspect_holder = cache->get_or_set(key, 0, 64_kb, inspect_ctx);
+ auto inspect_blocks = fromHolder(inspect_holder);
+ ASSERT_EQ(inspect_blocks.size(), 1);
+ ASSERT_EQ(inspect_blocks[0]->state(),
io::FileBlock::State::DOWNLOADED);
+ std::string cache_file = inspect_blocks[0]->get_cache_file();
+ ASSERT_TRUE(fs::exists(cache_file));
+ ASSERT_TRUE(global_local_filesystem()->delete_file(cache_file).ok());
+ ASSERT_FALSE(fs::exists(cache_file));
+ }
+
+ ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+ bool self_healed = false;
+ for (int i = 0; i < 100; ++i) {
+ io::CacheContext verify_ctx;
+ ReadStatistics verify_stats;
+ verify_ctx.stats = &verify_stats;
+ verify_ctx.cache_type = io::FileCacheType::NORMAL;
+ auto verify_holder = cache->get_or_set(key, 0, 64_kb, verify_ctx);
+ auto verify_blocks = fromHolder(verify_holder);
+ if (verify_blocks.size() == 1 && verify_blocks[0]->state() ==
io::FileBlock::State::EMPTY) {
+ self_healed = true;
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ EXPECT_TRUE(self_healed);
+ EXPECT_EQ(g_read_cache_self_heal_on_not_found.get_value(),
before_self_heal + 1);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
+TEST_F(BlockFileCacheTest,
cached_remote_file_reader_no_self_heal_on_non_not_found_error) {
+ bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+ config::enable_read_cache_file_directly = false;
+ Defer reset_direct_read {
+ [&] { config::enable_read_cache_file_directly =
origin_enable_direct_read; }};
+
+ std::string cache_base_path =
+ caches_dir /
"cached_remote_reader_no_self_heal_on_non_not_found_error" / "";
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+ for (int i = 0; i < 100; i++) {
+ if (cache->get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ CachedRemoteFileReader reader(local_reader, opts);
+
+ std::string buffer(64_kb, '\0');
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read = 0;
+ ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+
+ uint64_t before_self_heal =
g_read_cache_self_heal_on_not_found.get_value();
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ Defer defer {[&] {
+ sp->clear_call_back("LocalFileReader::read_at_impl");
+ sp->disable_processing();
+ }};
+ sp->set_call_back("LocalFileReader::read_at_impl", [&](auto&& values) {
+ std::pair<Status, bool>* pair = try_any_cast<std::pair<Status,
bool>*>(values.back());
+ pair->first = Status::IOError("inject io error for cache read");
+ pair->second = true;
+ });
+
+ auto st = reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx);
+ ASSERT_FALSE(st.ok());
+ EXPECT_EQ(g_read_cache_self_heal_on_not_found.get_value(),
before_self_heal);
+
+ sp->clear_call_back("LocalFileReader::read_at_impl");
+ sp->disable_processing();
+
+ io::CacheContext verify_ctx;
+ ReadStatistics verify_stats;
+ verify_ctx.stats = &verify_stats;
+ verify_ctx.cache_type = io::FileCacheType::NORMAL;
+ auto key = io::BlockFileCache::hash("tmp_file");
+ auto verify_holder = cache->get_or_set(key, 0, 64_kb, verify_ctx);
+ auto verify_blocks = fromHolder(verify_holder);
+ ASSERT_EQ(verify_blocks.size(), 1);
+ EXPECT_EQ(verify_blocks[0]->state(), io::FileBlock::State::DOWNLOADED);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) {
std::string cache_base_path = caches_dir /
"cached_remote_file_reader_init" / "";
if (fs::exists(cache_base_path)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]