This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c7c9de3e953 branch-3.0: [fix](cloud) fix get_or_set emptry <offset,
cell> map #49793 (#49873)
c7c9de3e953 is described below
commit c7c9de3e9536cedf6608ce9ec877fac01aebef1d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 8 17:46:06 2025 +0800
branch-3.0: [fix](cloud) fix get_or_set emptry <offset, cell> map #49793
(#49873)
Cherry-picked from #49793
Co-authored-by: zhengyu <[email protected]>
---
be/src/io/cache/block_file_cache.cpp | 96 +++++++++++++++----------
be/test/io/cache/block_file_cache_test.cpp | 112 ++++++++++++++++++++++++++++-
2 files changed, 168 insertions(+), 40 deletions(-)
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index ad3c6e99638..52e36e42f9f 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -412,13 +412,13 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
}
auto& file_blocks = it->second;
- DCHECK(!file_blocks.empty());
if (file_blocks.empty()) {
LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
<< " cache type=" << context.cache_type
<< " cache expiration time=" << context.expiration_time
<< " cache range=" << range.left << " " << range.right
<< " query id=" << context.query_id;
+ DCHECK(false);
_files.erase(hash);
return {};
}
@@ -759,13 +759,12 @@ BlockFileCache::FileBlockCell*
BlockFileCache::add_cell(const UInt128Wrapper& ha
return nullptr; /// Empty files are not cached.
}
- DCHECK_EQ(_files[hash].count(offset), 0)
+ auto& offsets = _files[hash];
+ DCHECK_EQ(offsets.count(offset), 0)
<< "Cache already exists for hash: " << hash.to_string() << ",
offset: " << offset
<< ", size: " << size
<< ".\nCurrent cache structure: " << dump_structure_unlocked(hash,
cache_lock);
- auto& offsets = _files[hash];
-
FileCacheKey key;
key.hash = hash;
key.offset = offset;
@@ -1052,37 +1051,43 @@ bool BlockFileCache::remove_if_ttl_file_blocks(const
UInt128Wrapper& file_key, b
if (auto iter = _key_to_time.find(file_key);
_key_to_time.find(file_key) != _key_to_time.end()) {
if (!remove_directly) {
- for (auto& [_, cell] : _files[file_key]) {
- if (cell.file_block->cache_type() != FileCacheType::TTL) {
- continue;
- }
- Status st = cell.file_block->update_expiration_time(0);
- if (!st.ok()) {
- LOG_WARNING("Failed to update expiration time to
0").error(st);
- }
+ auto it = _files.find(file_key);
+ if (it != _files.end()) {
+ for (auto& [_, cell] : it->second) {
+ if (cell.file_block->cache_type() != FileCacheType::TTL) {
+ continue;
+ }
+ Status st = cell.file_block->update_expiration_time(0);
+ if (!st.ok()) {
+ LOG_WARNING("Failed to update expiration time to
0").error(st);
+ }
- if (cell.file_block->cache_type() == FileCacheType::NORMAL)
continue;
- st = cell.file_block->change_cache_type_between_ttl_and_others(
- FileCacheType::NORMAL);
- if (st.ok()) {
- if (cell.queue_iterator) {
- ttl_queue.remove(cell.queue_iterator.value(),
cache_lock);
+ if (cell.file_block->cache_type() ==
FileCacheType::NORMAL) continue;
+ st =
cell.file_block->change_cache_type_between_ttl_and_others(
+ FileCacheType::NORMAL);
+ if (st.ok()) {
+ if (cell.queue_iterator) {
+ ttl_queue.remove(cell.queue_iterator.value(),
cache_lock);
+ }
+ auto& queue = get_queue(FileCacheType::NORMAL);
+ cell.queue_iterator = queue.add(
+ cell.file_block->get_hash_value(),
cell.file_block->offset(),
+ cell.file_block->range().size(), cache_lock);
+ } else {
+ LOG_WARNING("Failed to change cache type to
normal").error(st);
}
- auto& queue = get_queue(FileCacheType::NORMAL);
- cell.queue_iterator =
- queue.add(cell.file_block->get_hash_value(),
cell.file_block->offset(),
- cell.file_block->range().size(),
cache_lock);
- } else {
- LOG_WARNING("Failed to change cache type to
normal").error(st);
}
}
} else {
std::vector<FileBlockCell*> to_remove;
- for (auto& [_, cell] : _files[file_key]) {
- if (cell.releasable()) {
- to_remove.push_back(&cell);
- } else {
- cell.file_block->set_deleting();
+ auto it = _files.find(file_key);
+ if (it != _files.end()) {
+ for (auto& [_, cell] : it->second) {
+ if (cell.releasable()) {
+ to_remove.push_back(&cell);
+ } else {
+ cell.file_block->set_deleting();
+ }
}
}
std::for_each(to_remove.begin(), to_remove.end(),
[&](FileBlockCell* cell) {
@@ -1404,10 +1409,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block,
T& cache_lock, U& block_lo
if (FileCacheType::TTL == type) {
_cur_ttl_size -= file_block->range().size();
}
- auto& offsets = _files[hash];
- offsets.erase(file_block->offset());
- if (offsets.empty()) {
- _files.erase(hash);
+ auto it = _files.find(hash);
+ if (it != _files.end()) {
+ it->second.erase(file_block->offset());
+ if (it->second.empty()) {
+ _files.erase(hash);
+ }
}
*_num_removed_blocks << 1;
}
@@ -1525,7 +1532,11 @@ std::string BlockFileCache::dump_structure(const
UInt128Wrapper& hash) {
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
std::lock_guard<std::mutex>&) {
std::stringstream result;
- const auto& cells_by_offset = _files[hash];
+ auto it = _files.find(hash);
+ if (it == _files.end()) {
+ return std::string("");
+ }
+ const auto& cells_by_offset = it->second;
for (const auto& [_, cell] : cells_by_offset) {
result << cell.file_block->get_info_for_log() << " "
@@ -1544,7 +1555,11 @@ std::string
BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper
size_t offset,
std::lock_guard<std::mutex>&) {
std::stringstream result;
- const auto& cells_by_offset = _files[hash];
+ auto it = _files.find(hash);
+ if (it == _files.end()) {
+ return std::string("");
+ }
+ const auto& cells_by_offset = it->second;
const auto& cell = cells_by_offset.find(offset);
return cache_type_to_string(cell->second.file_block->cache_type());
@@ -1926,10 +1941,13 @@ void BlockFileCache::modify_expiration_time(const
UInt128Wrapper& hash,
}
_time_to_key.insert(std::make_pair(new_expiration_time, hash));
iter->second = new_expiration_time;
- for (auto& [_, cell] : _files[hash]) {
- Status st =
cell.file_block->update_expiration_time(new_expiration_time);
- if (!st.ok()) {
- LOG_WARNING("Failed to modify expiration time").error(st);
+ auto it = _files.find(hash);
+ if (it != _files.end()) {
+ for (auto& [_, cell] : it->second) {
+ Status st =
cell.file_block->update_expiration_time(new_expiration_time);
+ if (!st.ok()) {
+ LOG_WARNING("Failed to modify expiration time").error(st);
+ }
}
}
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index d531f8cc896..6aad66de5cc 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -7077,10 +7077,120 @@ TEST_F(BlockFileCacheTest,
test_evict_cache_in_advance_skip) {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
- fs::remove_all(cache_base_path);
+ // fs::remove_all(cache_base_path);
config::file_cache_enter_need_evict_cache_in_advance_percent =
origin_enter;
config::file_cache_exit_need_evict_cache_in_advance_percent = origin_exit;
config::file_cache_evict_in_advance_recycle_keys_num_threshold =
origin_threshold;
}
+TEST_F(BlockFileCacheTest, validate_get_or_set_crash) {
+ {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ auto sp = SyncPoint::get_instance();
+ sp->enable_processing();
+
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ // block the async load process
+ std::atomic_bool flag1 {false};
+ SyncPoint::CallbackGuard guard1;
+ sp->set_call_back(
+ "BlockFileCache::BeforeScan",
+ [&](auto&&) {
+ // create a tmp file in hash "key1"
lru_cache_test/cache1/f36/f36131fb4ba563c17e727cd0cdd63689_0/0_tmp
+ ASSERT_TRUE(global_local_filesystem()->create_directory(
+ fs::current_path() / "lru_cache_test" / "cache1" /
"f36" /
+ "f36131fb4ba563c17e727cd0cdd63689_0"));
+ FileWriterPtr writer;
+ ASSERT_TRUE(global_local_filesystem()
+
->create_file("lru_cache_test/cache1/f36/"
+
"f36131fb4ba563c17e727cd0cdd63689_0/0_tmp",
+ &writer)
+ .ok());
+ ASSERT_TRUE(writer->append(Slice("333", 3)).ok());
+ ASSERT_TRUE(writer->close().ok());
+ while (!flag1) {
+ }
+ },
+ &guard1);
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ // make a get request in async open phase
+ {
+ io::CacheContext context1;
+ ReadStatistics rstats;
+ context1.stats = &rstats;
+ context1.cache_type = io::FileCacheType::DISPOSABLE;
+ context1.query_id = query_id;
+ auto key1 = io::BlockFileCache::hash("key1");
+ LOG(INFO) << key1.to_string();
+ auto holder = cache.get_or_set(key1, 0, 100000, context1);
+ }
+
+ // continue async load
+ flag1 = true;
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ io::CacheContext context1;
+ ReadStatistics rstats;
+ context1.stats = &rstats;
+ context1.cache_type = io::FileCacheType::DISPOSABLE;
+ context1.query_id = query_id;
+ auto key1 = io::BlockFileCache::hash("key1");
+
+ // get key1 againqq
+ int64_t offset = 0;
+ {
+ auto holder = cache.get_or_set(key1, offset, 100000, context1);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ }
+
+ SyncPoint::get_instance()->disable_processing();
+ SyncPoint::get_instance()->clear_all_call_backs();
+
+ //if (fs::exists(cache_base_path)) {
+ // fs::remove_all(cache_base_path);
+ //}
+}
+
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]