This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new bee8e3edb47 branch-3.0: [fix](cloud) fix file cache types priority 
order #51463 (#51603)
bee8e3edb47 is described below

commit bee8e3edb47f623d0fe7b60987fa452c109a3fd6
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 11 10:35:16 2025 +0800

    branch-3.0: [fix](cloud) fix file cache types priority order #51463 (#51603)
    
    Cherry-picked from #51463
    
    Signed-off-by: zhengyu <zhangzhen...@selectdb.com>
    Co-authored-by: zhengyu <zhangzhen...@selectdb.com>
---
 be/src/io/cache/file_cache_common.h        |   8 +-
 be/test/io/cache/block_file_cache_test.cpp | 153 +++++++++++++++++++++++++++++
 2 files changed, 157 insertions(+), 4 deletions(-)

diff --git a/be/src/io/cache/file_cache_common.h 
b/be/src/io/cache/file_cache_common.h
index 25df07b5ddf..6e9396fb11a 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -128,13 +128,13 @@ FileCacheSettings get_file_cache_settings(size_t 
capacity, size_t max_query_cach
 
 struct CacheContext {
     CacheContext(const IOContext* io_context) {
-        if (io_context->is_index_data) {
+        if (io_context->expiration_time != 0) {
+            cache_type = FileCacheType::TTL;
+            expiration_time = io_context->expiration_time;
+        } else if (io_context->is_index_data) {
             cache_type = FileCacheType::INDEX;
         } else if (io_context->is_disposable) {
             cache_type = FileCacheType::DISPOSABLE;
-        } else if (io_context->expiration_time != 0) {
-            cache_type = FileCacheType::TTL;
-            expiration_time = io_context->expiration_time;
         } else {
             cache_type = FileCacheType::NORMAL;
         }
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index d0546787026..0d6883e4c8b 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -7752,4 +7752,157 @@ TEST_F(BlockFileCacheTest, 
test_upgrade_cache_dir_version) {
     }
 }
 
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_ttl_index) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.query_id = query_id;
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    BlockFileCache* cache = 
FileCacheFactory::instance()->get_by_path(cache_base_path);
+
+    for (int i = 0; i < 100; i++) {
+        if (cache->get_async_open_success()) {
+            break;
+        };
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    CachedRemoteFileReader reader(local_reader, opts);
+    auto key = io::BlockFileCache::hash("tmp_file");
+    EXPECT_EQ(reader._cache_hash, key);
+    EXPECT_EQ(local_reader->path().native(), reader.path().native());
+    EXPECT_EQ(local_reader->size(), reader.size());
+    EXPECT_FALSE(reader.closed());
+    EXPECT_EQ(local_reader->path().native(), 
reader.get_remote_reader()->path().native());
+    {
+        std::string buffer;
+        buffer.resize(64_kb);
+        IOContext io_ctx;
+        FileCacheStatistics stats;
+        io_ctx.file_cache_stats = &stats;
+        io_ctx.is_index_data = true;
+        int64_t cur_time = UnixSeconds();
+        io_ctx.expiration_time = cur_time + 120;
+        size_t bytes_read {0};
+        EXPECT_TRUE(
+                reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    }
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+    LOG(INFO) << "ttl:" << cache->_ttl_queue.cache_size;
+    LOG(INFO) << "index:" << cache->_index_queue.cache_size;
+    LOG(INFO) << "normal:" << cache->_normal_queue.cache_size;
+    LOG(INFO) << "disp:" << cache->_disposable_queue.cache_size;
+    EXPECT_EQ(cache->_ttl_queue.cache_size, 1048576);
+    EXPECT_EQ(cache->_index_queue.cache_size, 0);
+
+    EXPECT_TRUE(reader.close().ok());
+    EXPECT_TRUE(reader.closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+}
+
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.query_id = query_id;
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    BlockFileCache* cache = 
FileCacheFactory::instance()->get_by_path(cache_base_path);
+
+    for (int i = 0; i < 100; i++) {
+        if (cache->get_async_open_success()) {
+            break;
+        };
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    CachedRemoteFileReader reader(local_reader, opts);
+    auto key = io::BlockFileCache::hash("tmp_file");
+    EXPECT_EQ(reader._cache_hash, key);
+    EXPECT_EQ(local_reader->path().native(), reader.path().native());
+    EXPECT_EQ(local_reader->size(), reader.size());
+    EXPECT_FALSE(reader.closed());
+    EXPECT_EQ(local_reader->path().native(), 
reader.get_remote_reader()->path().native());
+
+    {
+        std::string buffer;
+        buffer.resize(64_kb);
+        IOContext io_ctx;
+        FileCacheStatistics stats;
+        io_ctx.file_cache_stats = &stats;
+        io_ctx.is_index_data = true;
+        // int64_t cur_time = UnixSeconds();
+        // io_ctx.expiration_time = cur_time + 120;
+        size_t bytes_read {0};
+        EXPECT_TRUE(
+                reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    }
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+    LOG(INFO) << "ttl:" << cache->_ttl_queue.cache_size;
+    LOG(INFO) << "index:" << cache->_index_queue.cache_size;
+    LOG(INFO) << "normal:" << cache->_normal_queue.cache_size;
+    LOG(INFO) << "disp:" << cache->_disposable_queue.cache_size;
+    EXPECT_EQ(cache->_ttl_queue.cache_size, 0);
+    EXPECT_EQ(cache->_index_queue.cache_size, 1048576);
+
+    EXPECT_TRUE(reader.close().ok());
+    EXPECT_TRUE(reader.closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+}
+
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to