This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new bee8e3edb47 branch-3.0: [fix](cloud) fix file cache types priority order #51463 (#51603) bee8e3edb47 is described below commit bee8e3edb47f623d0fe7b60987fa452c109a3fd6 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Jun 11 10:35:16 2025 +0800 branch-3.0: [fix](cloud) fix file cache types priority order #51463 (#51603) Cherry-picked from #51463 Signed-off-by: zhengyu <zhangzhen...@selectdb.com> Co-authored-by: zhengyu <zhangzhen...@selectdb.com> --- be/src/io/cache/file_cache_common.h | 8 +- be/test/io/cache/block_file_cache_test.cpp | 153 +++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 4 deletions(-) diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 25df07b5ddf..6e9396fb11a 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -128,13 +128,13 @@ FileCacheSettings get_file_cache_settings(size_t capacity, size_t max_query_cach struct CacheContext { CacheContext(const IOContext* io_context) { - if (io_context->is_index_data) { + if (io_context->expiration_time != 0) { + cache_type = FileCacheType::TTL; + expiration_time = io_context->expiration_time; + } else if (io_context->is_index_data) { cache_type = FileCacheType::INDEX; } else if (io_context->is_disposable) { cache_type = FileCacheType::DISPOSABLE; - } else if (io_context->expiration_time != 0) { - cache_type = FileCacheType::TTL; - expiration_time = io_context->expiration_time; } else { cache_type = FileCacheType::NORMAL; } diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index d0546787026..0d6883e4c8b 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -7752,4 +7752,157 @@ TEST_F(BlockFileCacheTest, test_upgrade_cache_dir_version) { } } +TEST_F(BlockFileCacheTest, cached_remote_file_reader_ttl_index) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 6291456; + settings.query_queue_elements = 6; + settings.index_queue_size = 1048576; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1048576; + settings.disposable_queue_elements = 1; + settings.capacity = 8388608; + settings.max_file_block_size = 1048576; + settings.max_query_cache_size = 0; + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.query_id = query_id; + ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok()); + BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(cache_base_path); + + for (int i = 0; i < 100; i++) { + if (cache->get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + CachedRemoteFileReader reader(local_reader, opts); + auto key = io::BlockFileCache::hash("tmp_file"); + EXPECT_EQ(reader._cache_hash, key); + EXPECT_EQ(local_reader->path().native(), reader.path().native()); + EXPECT_EQ(local_reader->size(), reader.size()); + EXPECT_FALSE(reader.closed()); + EXPECT_EQ(local_reader->path().native(), reader.get_remote_reader()->path().native()); + { + std::string buffer; + buffer.resize(64_kb); + IOContext io_ctx; + FileCacheStatistics stats; + io_ctx.file_cache_stats = &stats; + io_ctx.is_index_data = true; + int64_t cur_time = UnixSeconds(); + io_ctx.expiration_time = cur_time + 120; + size_t bytes_read {0}; + EXPECT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); + } + std::this_thread::sleep_for(std::chrono::seconds(3)); + LOG(INFO) << "ttl:" << cache->_ttl_queue.cache_size; + LOG(INFO) << "index:" << cache->_index_queue.cache_size; + LOG(INFO) << "normal:" << cache->_normal_queue.cache_size; + LOG(INFO) << "disp:" << cache->_disposable_queue.cache_size; + EXPECT_EQ(cache->_ttl_queue.cache_size, 1048576); + EXPECT_EQ(cache->_index_queue.cache_size, 0); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); + std::this_thread::sleep_for(std::chrono::seconds(1)); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; +} + +TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 6291456; + settings.query_queue_elements = 6; + settings.index_queue_size = 1048576; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1048576; + settings.disposable_queue_elements = 1; + settings.capacity = 8388608; + settings.max_file_block_size = 1048576; + settings.max_query_cache_size = 0; + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.query_id = query_id; + ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok()); + BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(cache_base_path); + + for (int i = 0; i < 100; i++) { + if (cache->get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + CachedRemoteFileReader reader(local_reader, opts); + auto key = io::BlockFileCache::hash("tmp_file"); + EXPECT_EQ(reader._cache_hash, key); + EXPECT_EQ(local_reader->path().native(), reader.path().native()); + EXPECT_EQ(local_reader->size(), reader.size()); + EXPECT_FALSE(reader.closed()); + EXPECT_EQ(local_reader->path().native(), reader.get_remote_reader()->path().native()); + + { + std::string buffer; + buffer.resize(64_kb); + IOContext io_ctx; + FileCacheStatistics stats; + io_ctx.file_cache_stats = &stats; + io_ctx.is_index_data = true; + // int64_t cur_time = UnixSeconds(); + // io_ctx.expiration_time = cur_time + 120; + size_t bytes_read {0}; + EXPECT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); + } + std::this_thread::sleep_for(std::chrono::seconds(3)); + LOG(INFO) << "ttl:" << cache->_ttl_queue.cache_size; + LOG(INFO) << "index:" << cache->_index_queue.cache_size; + LOG(INFO) << "normal:" << cache->_normal_queue.cache_size; + LOG(INFO) << "disp:" << cache->_disposable_queue.cache_size; + EXPECT_EQ(cache->_ttl_queue.cache_size, 0); + EXPECT_EQ(cache->_index_queue.cache_size, 1048576); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); + std::this_thread::sleep_for(std::chrono::seconds(1)); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; +} + } // namespace doris::io --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org