This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c2ad1b0c8d (cloud-merge) Reduce the number of lock requests when read 
from file cache (#34136)
1c2ad1b0c8d is described below

commit 1c2ad1b0c8dcc863ac06d23bb7ca72007fc38856
Author: Lightman <[email protected]>
AuthorDate: Sat Apr 27 13:53:05 2024 +0800

    (cloud-merge) Reduce the number of lock requests when read from file cache 
(#34136)
---
 be/src/common/config.cpp                      |   1 +
 be/src/common/config.h                        |   1 +
 be/src/io/cache/block_file_cache.cpp          |  16 +++
 be/src/io/cache/block_file_cache.h            |   2 +-
 be/src/io/cache/cached_remote_file_reader.cpp |  68 +++++++++-
 be/src/io/cache/cached_remote_file_reader.h   |   6 +
 be/src/io/cache/file_block.h                  |   1 +
 be/test/io/cache/block_file_cache_test.cpp    | 178 +++++++++++++++++++++-----
 8 files changed, 232 insertions(+), 41 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 31a69418d94..ea0e0bb3aad 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -978,6 +978,7 @@ DEFINE_Bool(clear_file_cache, "false");
 DEFINE_Bool(enable_file_cache_query_limit, "false");
 DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
 DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
+DEFINE_mBool(enable_read_cache_file_directly, "false");
 
 DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
 DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2456de33756..40d336d2308 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1022,6 +1022,7 @@ DECLARE_Bool(clear_file_cache);
 DECLARE_Bool(enable_file_cache_query_limit);
 DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
 DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
+DECLARE_mBool(enable_read_cache_file_directly);
 
 // inverted index searcher cache
 // cache entry stay time after lookup
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index 26ca8e47596..6a1c873966c 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -1571,6 +1571,22 @@ std::string BlockFileCache::clear_file_cache_directly() {
     return msg;
 }
 
+std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const 
UInt128Wrapper& hash) {
+    std::map<size_t, FileBlockSPtr> offset_to_block;
+    std::lock_guard cache_lock(_mutex);
+    if (_files.contains(hash)) {
+        for (auto& [offset, cell] : _files[hash]) {
+            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
+                offset_to_block.emplace(offset, cell.file_block);
+                use_cell(cell, nullptr,
+                         need_to_move(cell.file_block->cache_type(), 
FileCacheType::DISPOSABLE),
+                         cache_lock);
+            }
+        }
+    }
+    return offset_to_block;
+}
+
 template void BlockFileCache::remove(FileBlockSPtr file_block,
                                      std::lock_guard<std::mutex>& cache_lock,
                                      std::lock_guard<std::mutex>& block_lock);
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index f086c2c680e..6f19095eace 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -94,7 +94,7 @@ public:
      */
     std::string clear_file_cache_async();
     std::string clear_file_cache_directly();
-
+    std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper& 
hash);
     /// For debug.
     std::string dump_structure(const UInt128Wrapper& hash);
 
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index d976ccb0df4..9572180c892 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -34,6 +34,7 @@
 #include "io/cache/block_file_cache_profile.h"
 #include "io/cache/file_block.h"
 #include "io/fs/file_reader.h"
+#include "io/fs/local_file_system.h"
 #include "io/io_common.h"
 #include "util/bit_util.h"
 #include "util/doris_metrics.h"
@@ -50,6 +51,9 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr 
remote_file_reader
     if (_is_doris_table) {
         _cache_hash = BlockFileCache::hash(path().filename().native());
         _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
+        if (config::enable_read_cache_file_directly) {
+            _cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
+        }
     } else {
         // Use path and modification time to build cache key
         std::string unique_path = fmt::format("{}:{}", path().native(), 
opts.mtime);
@@ -69,6 +73,14 @@ 
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
     }
 }
 
+void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
+    if (config::enable_read_cache_file_directly) {
+        std::lock_guard lock(_mtx);
+        DCHECK(file_block->state() == FileBlock::State::DOWNLOADED);
+        _cache_file_readers.emplace(file_block->offset(), 
std::move(file_block));
+    }
+}
+
 CachedRemoteFileReader::~CachedRemoteFileReader() {
     static_cast<void>(close());
 }
@@ -110,6 +122,54 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
         return Status::OK();
     }
     ReadStatistics stats;
+    auto defer_func = [&](int*) {
+        if (io_ctx->file_cache_stats) {
+            _update_state(stats, io_ctx->file_cache_stats);
+            io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
+        }
+    };
+    std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, 
std::move(defer_func));
+    stats.bytes_read += bytes_req;
+    if (config::enable_read_cache_file_directly) {
+        // read directly
+        size_t need_read_size = bytes_req;
+        std::shared_lock lock(_mtx);
+        if (!_cache_file_readers.empty()) {
+            // find the last offset > offset.
+            auto iter = _cache_file_readers.upper_bound(offset);
+            if (iter != _cache_file_readers.begin()) {
+                iter--;
+            }
+            size_t cur_offset = offset;
+            while (need_read_size != 0 && iter != _cache_file_readers.end()) {
+                if (iter->second->offset() > cur_offset ||
+                    iter->second->range().right < cur_offset) {
+                    break;
+                }
+                size_t file_offset = cur_offset - iter->second->offset();
+                size_t reserve_bytes =
+                        std::min(need_read_size, iter->second->range().size() 
- file_offset);
+                {
+                    SCOPED_RAW_TIMER(&stats.local_read_timer);
+                    if (!iter->second
+                                 ->read(Slice(result.data + (cur_offset - 
offset), reserve_bytes),
+                                        file_offset)
+                                 .ok()) {
+                        break;
+                    }
+                }
+                need_read_size -= reserve_bytes;
+                cur_offset += reserve_bytes;
+                iter++;
+            }
+            if (need_read_size == 0) {
+                *bytes_read = bytes_req;
+                stats.hit_cache = true;
+                return Status::OK();
+            }
+        }
+    }
+    // read from cache or remote
     auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
     CacheContext cache_context(io_ctx);
     FileBlocksHolder holder =
@@ -137,7 +197,6 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
             break;
         }
     }
-    stats.bytes_read += bytes_req;
     size_t empty_start = 0;
     size_t empty_end = 0;
     if (!empty_blocks.empty()) {
@@ -164,6 +223,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
             }
             if (!st.ok()) {
                 LOG_WARNING("Write data to file cache failed").error(st);
+            } else {
+                _insert_file_reader(block);
             }
             stats.bytes_write_into_file_cache += block_size;
         }
@@ -243,11 +304,6 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
         current_offset = right + 1;
     }
     DCHECK(*bytes_read == bytes_req);
-    DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
-    if (io_ctx->file_cache_stats) {
-        _update_state(stats, io_ctx->file_cache_stats);
-        io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
-    }
     return Status::OK();
 }
 
diff --git a/be/src/io/cache/cached_remote_file_reader.h 
b/be/src/io/cache/cached_remote_file_reader.h
index 6cdba2c3dcc..34fc53c7310 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -20,14 +20,17 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include <map>
 #include <memory>
 #include <string>
 #include <utility>
 
 #include "common/status.h"
 #include "io/cache/block_file_cache.h"
+#include "io/cache/file_block.h"
 #include "io/cache/file_cache_common.h"
 #include "io/fs/file_reader.h"
+#include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/file_system.h"
 #include "io/fs/path.h"
 #include "util/slice.h"
@@ -60,10 +63,13 @@ protected:
                         const IOContext* io_ctx) override;
 
 private:
+    void _insert_file_reader(FileBlockSPtr file_block);
     bool _is_doris_table;
     FileReaderSPtr _remote_file_reader;
     UInt128Wrapper _cache_hash;
     BlockFileCache* _cache;
+    std::shared_mutex _mtx;
+    std::map<size_t, FileBlockSPtr> _cache_file_readers;
 
     struct ReadStatistics {
         bool hit_cache = true;
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index 2587cd8607f..b4044786dc7 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -41,6 +41,7 @@ class BlockFileCache;
 class FileBlock {
     friend struct FileBlocksHolder;
     friend class BlockFileCache;
+    friend class CachedRemoteFileReader;
 
 public:
     enum class State {
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 64778b396a2..2ebd83d6c37 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -70,6 +70,14 @@ fs::path caches_dir = fs::current_path() / "lru_cache_test";
 std::string cache_base_path = caches_dir / "cache1" / "";
 std::string tmp_file = caches_dir / "tmp_file";
 
+constexpr unsigned long long operator"" _mb(unsigned long long m) {
+    return m * 1024 * 1024;
+}
+
+constexpr unsigned long long operator"" _kb(unsigned long long m) {
+    return m * 1024;
+}
+
 void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr 
file_block,
                   const io::FileBlock::Range& expected_range, 
io::FileBlock::State expected_state) {
     auto range = file_block->range();
@@ -123,7 +131,7 @@ public:
             FileWriterPtr writer;
             ASSERT_TRUE(global_local_filesystem()->create_file(tmp_file, 
&writer).ok());
             for (int i = 0; i < 10; i++) {
-                std::string data(1 * 1024 * 1024, '0' + i);
+                std::string data(1_mb, '0' + i);
                 ASSERT_TRUE(writer->append(Slice(data.data(), 
data.size())).ok());
             }
             std::string data(1, '0');
@@ -2862,7 +2870,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
     EXPECT_EQ(local_reader->path().native(), 
reader.get_remote_reader()->path().native());
     {
         std::string buffer;
-        buffer.resize(64 * 1024);
+        buffer.resize(64_kb);
         IOContext io_ctx;
         RuntimeProfile profile("file_cache_test");
         FileCacheProfileReporter reporter(&profile);
@@ -2871,19 +2879,19 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
         size_t bytes_read {0};
         ASSERT_TRUE(reader.read_at(32222, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
                             .ok());
-        EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+        EXPECT_EQ(std::string(64_kb, '0'), buffer);
         reporter.update(&stats);
     }
     {
         std::string buffer;
-        buffer.resize(64 * 1024);
+        buffer.resize(64_kb);
         IOContext io_ctx;
         FileCacheStatistics stats;
         io_ctx.file_cache_stats = &stats;
         size_t bytes_read {0};
-        EXPECT_FALSE(reader.read_at(10 * 1024 * 1024 + 2, Slice(buffer.data(), 
buffer.size()),
-                                    &bytes_read, &io_ctx)
-                             .ok());
+        EXPECT_FALSE(
+                reader.read_at(10_mb + 2, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                        .ok());
     }
     {
         std::string buffer;
@@ -2897,18 +2905,18 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
     }
     {
         std::string buffer;
-        buffer.resize(64 * 1024);
+        buffer.resize(64_kb);
         IOContext io_ctx;
         FileCacheStatistics stats;
         io_ctx.file_cache_stats = &stats;
         size_t bytes_read {0};
         ASSERT_TRUE(reader.read_at(32222, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
                             .ok());
-        EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+        EXPECT_EQ(std::string(64_kb, '0'), buffer);
     }
     {
         std::string buffer;
-        buffer.resize(10 * 1024 * 1024 + 1);
+        buffer.resize(10_mb + 1);
         IOContext io_ctx;
         FileCacheStatistics stats;
         io_ctx.file_cache_stats = &stats;
@@ -2916,11 +2924,11 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
         ASSERT_TRUE(
                 reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
         for (int i = 0; i < 10; i++) {
-            std::string data(1 * 1024 * 1024, '0' + i);
-            EXPECT_EQ(data, buffer.substr(i * 1024 * 1024, 1 * 1024 * 1024));
+            std::string data(1_mb, '0' + i);
+            EXPECT_EQ(data, buffer.substr(i * 1024 * 1024, 1_mb));
         }
         std::string data(1, '0');
-        EXPECT_EQ(data, buffer.substr(10 * 1024 * 1024, 1));
+        EXPECT_EQ(data, buffer.substr(10_mb, 1));
     }
     EXPECT_TRUE(reader.close().ok());
     EXPECT_TRUE(reader.closed());
@@ -2965,8 +2973,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_tail) {
         FileCacheStatistics stats;
         io_ctx.file_cache_stats = &stats;
         size_t bytes_read {0};
-        ASSERT_TRUE(reader.read_at(10 * 1024 * 1024, Slice(buffer.data(), 
buffer.size()),
-                                   &bytes_read, &io_ctx)
+        ASSERT_TRUE(reader.read_at(10_mb, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
                             .ok());
         EXPECT_EQ(std::string(1, '0'), buffer);
         reporter.update(&stats);
@@ -2976,12 +2983,12 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_tail) {
     {
         auto key = io::BlockFileCache::hash("tmp_file");
         auto cache = FileCacheFactory::instance()->get_by_path(key);
-        auto holder = cache->get_or_set(key, 9 * 1024 * 1024, 1024 * 1024 + 1, 
context);
+        auto holder = cache->get_or_set(key, 9_mb, 1024_kb + 1, context);
         auto blocks = fromHolder(holder);
         ASSERT_EQ(blocks.size(), 2);
-        assert_range(1, blocks[0], io::FileBlock::Range(9 * 1024 * 1024, 10 * 
1024 * 1024 - 1),
+        assert_range(1, blocks[0], io::FileBlock::Range(9_mb, 10_mb - 1),
                      io::FileBlock::State::DOWNLOADED);
-        assert_range(2, blocks[1], io::FileBlock::Range(10 * 1024 * 1024, 10 * 
1024 * 1024),
+        assert_range(2, blocks[1], io::FileBlock::Range(10_mb, 10_mb),
                      io::FileBlock::State::DOWNLOADED);
     }
     if (fs::exists(cache_base_path)) {
@@ -3035,14 +3042,14 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_error_handle) {
             pairs->second = true;
         });
         std::string buffer;
-        buffer.resize(64 * 1024);
+        buffer.resize(64_kb);
         IOContext io_ctx;
         FileCacheStatistics stats;
         io_ctx.file_cache_stats = &stats;
         size_t bytes_read {0};
         ASSERT_TRUE(
                 reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
-        EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+        EXPECT_EQ(std::string(64_kb, '0'), buffer);
     }
     {
         Defer defer {[sp] { sp->clear_call_back("LocalFileWriter::close"); }};
@@ -3051,14 +3058,14 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_error_handle) {
             pairs->second = true;
         });
         std::string buffer;
-        buffer.resize(64 * 1024);
+        buffer.resize(64_kb);
         IOContext io_ctx;
         FileCacheStatistics stats;
         io_ctx.file_cache_stats = &stats;
         size_t bytes_read {0};
         ASSERT_TRUE(
                 reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
-        EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+        EXPECT_EQ(std::string(64_kb, '0'), buffer);
     }
     EXPECT_TRUE(reader.close().ok());
     EXPECT_TRUE(reader.closed());
@@ -3162,24 +3169,24 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_concurrent) {
     });
     std::thread thread([&]() {
         std::string buffer;
-        buffer.resize(64 * 1024);
+        buffer.resize(64_kb);
         IOContext io_ctx;
         FileCacheStatistics stats;
         io_ctx.file_cache_stats = &stats;
         size_t bytes_read {0};
         ASSERT_TRUE(reader->read_at(100, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
                             .ok());
-        EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+        EXPECT_EQ(std::string(64_kb, '0'), buffer);
     });
     std::string buffer;
-    buffer.resize(64 * 1024);
+    buffer.resize(64_kb);
     IOContext io_ctx;
     FileCacheStatistics stats;
     io_ctx.file_cache_stats = &stats;
     size_t bytes_read {0};
     ASSERT_TRUE(
             reader->read_at(100, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
-    EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
     if (thread.joinable()) {
         thread.join();
     }
@@ -3233,24 +3240,24 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_concurrent_2) {
                       [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 2; 
});
     std::thread thread([&]() {
         std::string buffer;
-        buffer.resize(64 * 1024);
+        buffer.resize(64_kb);
         IOContext io_ctx;
         FileCacheStatistics stats;
         io_ctx.file_cache_stats = &stats;
         size_t bytes_read {0};
         ASSERT_TRUE(reader->read_at(100, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
                             .ok());
-        EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+        EXPECT_EQ(std::string(64_kb, '0'), buffer);
     });
     std::string buffer;
-    buffer.resize(64 * 1024);
+    buffer.resize(64_kb);
     IOContext io_ctx;
     FileCacheStatistics stats;
     io_ctx.file_cache_stats = &stats;
     size_t bytes_read {0};
     ASSERT_TRUE(
             reader->read_at(100, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
-    EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
     if (thread.joinable()) {
         thread.join();
     }
@@ -3592,10 +3599,6 @@ TEST_F(BlockFileCacheTest, 
test_check_disk_reource_limit_3) {
     }
 }
 
-constexpr unsigned long long operator"" _mb(unsigned long long m) {
-    return m * 1024 * 1024;
-}
-
 TEST_F(BlockFileCacheTest, test_align_size) {
     const size_t total_size = 10_mb + 10086;
     {
@@ -3697,4 +3700,111 @@ TEST_F(BlockFileCacheTest, 
remove_if_cached_when_isnt_releasable) {
     ASSERT_TRUE(blocks[0]->finalize().ok());
 }
 
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_opt_lock) {
+    config::enable_read_cache_file_directly = true;
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    io::CacheContext context;
+    context.query_id = query_id;
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    io::FileReaderOptions opts;
+    opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
+    opts.is_doris_table = true;
+    {
+        FileReaderSPtr local_reader;
+        ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, 
&local_reader).ok());
+        auto reader = CachedRemoteFileReader(local_reader, opts);
+        EXPECT_EQ(reader._cache_file_readers.size(), 0);
+        std::string buffer;
+        buffer.resize(6_mb);
+        IOContext io_ctx;
+        size_t bytes_read {0};
+        ASSERT_TRUE(reader.read_at(1_mb, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                            .ok());
+        EXPECT_EQ(reader._cache_file_readers.size(), 6);
+    }
+    {
+        FileReaderSPtr local_reader;
+        ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, 
&local_reader).ok());
+        auto reader = CachedRemoteFileReader(local_reader, opts);
+        EXPECT_EQ(reader._cache_file_readers.size(), 6);
+        std::random_device rd;  // a seed source for the random number engine
+        std::mt19937 gen(rd()); // mersenne_twister_engine seeded with rd()
+        std::uniform_int_distribution<> distrib(1_mb, 7_mb);
+        std::ranges::for_each(std::ranges::iota_view {0, 1000}, [&](int) {
+            size_t read_offset = distrib(gen);
+            size_t read_size = distrib(gen) % 1_mb;
+            if (read_offset + read_size > 7_mb || read_size == 0) {
+                read_size = 1;
+            }
+            std::string buffer;
+            buffer.resize(read_size);
+            IOContext io_ctx;
+            size_t bytes_read {0};
+            ASSERT_TRUE(reader.read_at(read_offset, Slice(buffer.data(), 
buffer.size()),
+                                       &bytes_read, &io_ctx)
+                                .ok());
+            EXPECT_EQ(bytes_read, read_size);
+            int num = read_offset / 1_mb;
+            size_t upper_offset = (num + 1) * 1_mb;
+            if (upper_offset < read_offset + read_size) {
+                size_t limit_size = upper_offset - read_offset;
+                EXPECT_EQ(std::string(limit_size, '0' + num), buffer.substr(0, 
limit_size));
+                EXPECT_EQ(std::string(read_size - limit_size, '0' + (num + 1)),
+                          buffer.substr(limit_size));
+            } else {
+                EXPECT_EQ(std::string(read_size, '0' + num), buffer);
+            }
+        });
+    }
+    {
+        FileReaderSPtr local_reader;
+        ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, 
&local_reader).ok());
+        auto reader = CachedRemoteFileReader(local_reader, opts);
+        std::string buffer;
+        buffer.resize(10086);
+        IOContext io_ctx;
+        size_t bytes_read {0};
+        ASSERT_TRUE(reader.read_at(9_mb, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                            .ok());
+        EXPECT_EQ(buffer, std::string(10086, '9'));
+        EXPECT_EQ(reader._cache_file_readers.size(), 7);
+    }
+    {
+        FileReaderSPtr local_reader;
+        ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, 
&local_reader).ok());
+        auto reader = CachedRemoteFileReader(local_reader, opts);
+        std::string buffer;
+        buffer.resize(10086);
+        IOContext io_ctx;
+        size_t bytes_read {0};
+        ASSERT_TRUE(
+                reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+        EXPECT_EQ(buffer, std::string(10086, '0'));
+        EXPECT_EQ(reader._cache_file_readers.size(), 8);
+    }
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+    config::enable_read_cache_file_directly = false;
+}
+
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to