This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1c2ad1b0c8d (cloud-merge) Reduce the number of lock requests when read
from file cache (#34136)
1c2ad1b0c8d is described below
commit 1c2ad1b0c8dcc863ac06d23bb7ca72007fc38856
Author: Lightman <[email protected]>
AuthorDate: Sat Apr 27 13:53:05 2024 +0800
(cloud-merge) Reduce the number of lock requests when read from file cache
(#34136)
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/io/cache/block_file_cache.cpp | 16 +++
be/src/io/cache/block_file_cache.h | 2 +-
be/src/io/cache/cached_remote_file_reader.cpp | 68 +++++++++-
be/src/io/cache/cached_remote_file_reader.h | 6 +
be/src/io/cache/file_block.h | 1 +
be/test/io/cache/block_file_cache_test.cpp | 178 +++++++++++++++++++++-----
8 files changed, 232 insertions(+), 41 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 31a69418d94..ea0e0bb3aad 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -978,6 +978,7 @@ DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
+DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2456de33756..40d336d2308 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1022,6 +1022,7 @@ DECLARE_Bool(clear_file_cache);
DECLARE_Bool(enable_file_cache_query_limit);
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
+DECLARE_mBool(enable_read_cache_file_directly);
// inverted index searcher cache
// cache entry stay time after lookup
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 26ca8e47596..6a1c873966c 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -1571,6 +1571,22 @@ std::string BlockFileCache::clear_file_cache_directly() {
return msg;
}
+std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const
UInt128Wrapper& hash) {
+ std::map<size_t, FileBlockSPtr> offset_to_block;
+ std::lock_guard cache_lock(_mutex);
+ if (_files.contains(hash)) {
+ for (auto& [offset, cell] : _files[hash]) {
+ if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
+ offset_to_block.emplace(offset, cell.file_block);
+ use_cell(cell, nullptr,
+ need_to_move(cell.file_block->cache_type(),
FileCacheType::DISPOSABLE),
+ cache_lock);
+ }
+ }
+ }
+ return offset_to_block;
+}
+
template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock);
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index f086c2c680e..6f19095eace 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -94,7 +94,7 @@ public:
*/
std::string clear_file_cache_async();
std::string clear_file_cache_directly();
-
+ std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper&
hash);
/// For debug.
std::string dump_structure(const UInt128Wrapper& hash);
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index d976ccb0df4..9572180c892 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -34,6 +34,7 @@
#include "io/cache/block_file_cache_profile.h"
#include "io/cache/file_block.h"
#include "io/fs/file_reader.h"
+#include "io/fs/local_file_system.h"
#include "io/io_common.h"
#include "util/bit_util.h"
#include "util/doris_metrics.h"
@@ -50,6 +51,9 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader
if (_is_doris_table) {
_cache_hash = BlockFileCache::hash(path().filename().native());
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
+ if (config::enable_read_cache_file_directly) {
+ _cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
+ }
} else {
// Use path and modification time to build cache key
std::string unique_path = fmt::format("{}:{}", path().native(),
opts.mtime);
@@ -69,6 +73,14 @@
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
}
}
+void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
+ if (config::enable_read_cache_file_directly) {
+ std::lock_guard lock(_mtx);
+ DCHECK(file_block->state() == FileBlock::State::DOWNLOADED);
+ _cache_file_readers.emplace(file_block->offset(),
std::move(file_block));
+ }
+}
+
CachedRemoteFileReader::~CachedRemoteFileReader() {
static_cast<void>(close());
}
@@ -110,6 +122,54 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
return Status::OK();
}
ReadStatistics stats;
+ auto defer_func = [&](int*) {
+ if (io_ctx->file_cache_stats) {
+ _update_state(stats, io_ctx->file_cache_stats);
+ io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
+ }
+ };
+ std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01,
std::move(defer_func));
+ stats.bytes_read += bytes_req;
+ if (config::enable_read_cache_file_directly) {
+ // read directly
+ size_t need_read_size = bytes_req;
+ std::shared_lock lock(_mtx);
+ if (!_cache_file_readers.empty()) {
+ // find the last offset > offset.
+ auto iter = _cache_file_readers.upper_bound(offset);
+ if (iter != _cache_file_readers.begin()) {
+ iter--;
+ }
+ size_t cur_offset = offset;
+ while (need_read_size != 0 && iter != _cache_file_readers.end()) {
+ if (iter->second->offset() > cur_offset ||
+ iter->second->range().right < cur_offset) {
+ break;
+ }
+ size_t file_offset = cur_offset - iter->second->offset();
+ size_t reserve_bytes =
+ std::min(need_read_size, iter->second->range().size()
- file_offset);
+ {
+ SCOPED_RAW_TIMER(&stats.local_read_timer);
+ if (!iter->second
+ ->read(Slice(result.data + (cur_offset -
offset), reserve_bytes),
+ file_offset)
+ .ok()) {
+ break;
+ }
+ }
+ need_read_size -= reserve_bytes;
+ cur_offset += reserve_bytes;
+ iter++;
+ }
+ if (need_read_size == 0) {
+ *bytes_read = bytes_req;
+ stats.hit_cache = true;
+ return Status::OK();
+ }
+ }
+ }
+ // read from cache or remote
auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
CacheContext cache_context(io_ctx);
FileBlocksHolder holder =
@@ -137,7 +197,6 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
break;
}
}
- stats.bytes_read += bytes_req;
size_t empty_start = 0;
size_t empty_end = 0;
if (!empty_blocks.empty()) {
@@ -164,6 +223,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
}
if (!st.ok()) {
LOG_WARNING("Write data to file cache failed").error(st);
+ } else {
+ _insert_file_reader(block);
}
stats.bytes_write_into_file_cache += block_size;
}
@@ -243,11 +304,6 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
current_offset = right + 1;
}
DCHECK(*bytes_read == bytes_req);
- DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
- if (io_ctx->file_cache_stats) {
- _update_state(stats, io_ctx->file_cache_stats);
- io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
- }
return Status::OK();
}
diff --git a/be/src/io/cache/cached_remote_file_reader.h
b/be/src/io/cache/cached_remote_file_reader.h
index 6cdba2c3dcc..34fc53c7310 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -20,14 +20,17 @@
#include <stddef.h>
#include <stdint.h>
+#include <map>
#include <memory>
#include <string>
#include <utility>
#include "common/status.h"
#include "io/cache/block_file_cache.h"
+#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/file_reader.h"
+#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/file_system.h"
#include "io/fs/path.h"
#include "util/slice.h"
@@ -60,10 +63,13 @@ protected:
const IOContext* io_ctx) override;
private:
+ void _insert_file_reader(FileBlockSPtr file_block);
bool _is_doris_table;
FileReaderSPtr _remote_file_reader;
UInt128Wrapper _cache_hash;
BlockFileCache* _cache;
+ std::shared_mutex _mtx;
+ std::map<size_t, FileBlockSPtr> _cache_file_readers;
struct ReadStatistics {
bool hit_cache = true;
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index 2587cd8607f..b4044786dc7 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -41,6 +41,7 @@ class BlockFileCache;
class FileBlock {
friend struct FileBlocksHolder;
friend class BlockFileCache;
+ friend class CachedRemoteFileReader;
public:
enum class State {
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 64778b396a2..2ebd83d6c37 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -70,6 +70,14 @@ fs::path caches_dir = fs::current_path() / "lru_cache_test";
std::string cache_base_path = caches_dir / "cache1" / "";
std::string tmp_file = caches_dir / "tmp_file";
+constexpr unsigned long long operator"" _mb(unsigned long long m) {
+ return m * 1024 * 1024;
+}
+
+constexpr unsigned long long operator"" _kb(unsigned long long m) {
+ return m * 1024;
+}
+
void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr
file_block,
const io::FileBlock::Range& expected_range,
io::FileBlock::State expected_state) {
auto range = file_block->range();
@@ -123,7 +131,7 @@ public:
FileWriterPtr writer;
ASSERT_TRUE(global_local_filesystem()->create_file(tmp_file,
&writer).ok());
for (int i = 0; i < 10; i++) {
- std::string data(1 * 1024 * 1024, '0' + i);
+ std::string data(1_mb, '0' + i);
ASSERT_TRUE(writer->append(Slice(data.data(),
data.size())).ok());
}
std::string data(1, '0');
@@ -2862,7 +2870,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
EXPECT_EQ(local_reader->path().native(),
reader.get_remote_reader()->path().native());
{
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
RuntimeProfile profile("file_cache_test");
FileCacheProfileReporter reporter(&profile);
@@ -2871,19 +2879,19 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
size_t bytes_read {0};
ASSERT_TRUE(reader.read_at(32222, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
.ok());
- EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
reporter.update(&stats);
}
{
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
- EXPECT_FALSE(reader.read_at(10 * 1024 * 1024 + 2, Slice(buffer.data(),
buffer.size()),
- &bytes_read, &io_ctx)
- .ok());
+ EXPECT_FALSE(
+ reader.read_at(10_mb + 2, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
}
{
std::string buffer;
@@ -2897,18 +2905,18 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
}
{
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
ASSERT_TRUE(reader.read_at(32222, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
.ok());
- EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
}
{
std::string buffer;
- buffer.resize(10 * 1024 * 1024 + 1);
+ buffer.resize(10_mb + 1);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
@@ -2916,11 +2924,11 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
ASSERT_TRUE(
reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
for (int i = 0; i < 10; i++) {
- std::string data(1 * 1024 * 1024, '0' + i);
- EXPECT_EQ(data, buffer.substr(i * 1024 * 1024, 1 * 1024 * 1024));
+ std::string data(1_mb, '0' + i);
+ EXPECT_EQ(data, buffer.substr(i * 1024 * 1024, 1_mb));
}
std::string data(1, '0');
- EXPECT_EQ(data, buffer.substr(10 * 1024 * 1024, 1));
+ EXPECT_EQ(data, buffer.substr(10_mb, 1));
}
EXPECT_TRUE(reader.close().ok());
EXPECT_TRUE(reader.closed());
@@ -2965,8 +2973,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_tail) {
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
- ASSERT_TRUE(reader.read_at(10 * 1024 * 1024, Slice(buffer.data(),
buffer.size()),
- &bytes_read, &io_ctx)
+ ASSERT_TRUE(reader.read_at(10_mb, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
.ok());
EXPECT_EQ(std::string(1, '0'), buffer);
reporter.update(&stats);
@@ -2976,12 +2983,12 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_tail) {
{
auto key = io::BlockFileCache::hash("tmp_file");
auto cache = FileCacheFactory::instance()->get_by_path(key);
- auto holder = cache->get_or_set(key, 9 * 1024 * 1024, 1024 * 1024 + 1,
context);
+ auto holder = cache->get_or_set(key, 9_mb, 1024_kb + 1, context);
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 2);
- assert_range(1, blocks[0], io::FileBlock::Range(9 * 1024 * 1024, 10 *
1024 * 1024 - 1),
+ assert_range(1, blocks[0], io::FileBlock::Range(9_mb, 10_mb - 1),
io::FileBlock::State::DOWNLOADED);
- assert_range(2, blocks[1], io::FileBlock::Range(10 * 1024 * 1024, 10 *
1024 * 1024),
+ assert_range(2, blocks[1], io::FileBlock::Range(10_mb, 10_mb),
io::FileBlock::State::DOWNLOADED);
}
if (fs::exists(cache_base_path)) {
@@ -3035,14 +3042,14 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
pairs->second = true;
});
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
ASSERT_TRUE(
reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
- EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
}
{
Defer defer {[sp] { sp->clear_call_back("LocalFileWriter::close"); }};
@@ -3051,14 +3058,14 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
pairs->second = true;
});
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
ASSERT_TRUE(
reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
- EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
}
EXPECT_TRUE(reader.close().ok());
EXPECT_TRUE(reader.closed());
@@ -3162,24 +3169,24 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_concurrent) {
});
std::thread thread([&]() {
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
ASSERT_TRUE(reader->read_at(100, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
.ok());
- EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
});
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
ASSERT_TRUE(
reader->read_at(100, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
- EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
if (thread.joinable()) {
thread.join();
}
@@ -3233,24 +3240,24 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_concurrent_2) {
[](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 2;
});
std::thread thread([&]() {
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
ASSERT_TRUE(reader->read_at(100, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
.ok());
- EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
});
std::string buffer;
- buffer.resize(64 * 1024);
+ buffer.resize(64_kb);
IOContext io_ctx;
FileCacheStatistics stats;
io_ctx.file_cache_stats = &stats;
size_t bytes_read {0};
ASSERT_TRUE(
reader->read_at(100, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
- EXPECT_EQ(std::string(64 * 1024, '0'), buffer);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
if (thread.joinable()) {
thread.join();
}
@@ -3592,10 +3599,6 @@ TEST_F(BlockFileCacheTest,
test_check_disk_reource_limit_3) {
}
}
-constexpr unsigned long long operator"" _mb(unsigned long long m) {
- return m * 1024 * 1024;
-}
-
TEST_F(BlockFileCacheTest, test_align_size) {
const size_t total_size = 10_mb + 10086;
{
@@ -3697,4 +3700,111 @@ TEST_F(BlockFileCacheTest,
remove_if_cached_when_isnt_releasable) {
ASSERT_TRUE(blocks[0]->finalize().ok());
}
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_opt_lock) {
+ config::enable_read_cache_file_directly = true;
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+ io::CacheContext context;
+ context.query_id = query_id;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ io::FileReaderOptions opts;
+ opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
+ opts.is_doris_table = true;
+ {
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file,
&local_reader).ok());
+ auto reader = CachedRemoteFileReader(local_reader, opts);
+ EXPECT_EQ(reader._cache_file_readers.size(), 0);
+ std::string buffer;
+ buffer.resize(6_mb);
+ IOContext io_ctx;
+ size_t bytes_read {0};
+ ASSERT_TRUE(reader.read_at(1_mb, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
+ EXPECT_EQ(reader._cache_file_readers.size(), 6);
+ }
+ {
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file,
&local_reader).ok());
+ auto reader = CachedRemoteFileReader(local_reader, opts);
+ EXPECT_EQ(reader._cache_file_readers.size(), 6);
+ std::random_device rd; // a seed source for the random number engine
+ std::mt19937 gen(rd()); // mersenne_twister_engine seeded with rd()
+ std::uniform_int_distribution<> distrib(1_mb, 7_mb);
+ std::ranges::for_each(std::ranges::iota_view {0, 1000}, [&](int) {
+ size_t read_offset = distrib(gen);
+ size_t read_size = distrib(gen) % 1_mb;
+ if (read_offset + read_size > 7_mb || read_size == 0) {
+ read_size = 1;
+ }
+ std::string buffer;
+ buffer.resize(read_size);
+ IOContext io_ctx;
+ size_t bytes_read {0};
+ ASSERT_TRUE(reader.read_at(read_offset, Slice(buffer.data(),
buffer.size()),
+ &bytes_read, &io_ctx)
+ .ok());
+ EXPECT_EQ(bytes_read, read_size);
+ int num = read_offset / 1_mb;
+ size_t upper_offset = (num + 1) * 1_mb;
+ if (upper_offset < read_offset + read_size) {
+ size_t limit_size = upper_offset - read_offset;
+ EXPECT_EQ(std::string(limit_size, '0' + num), buffer.substr(0,
limit_size));
+ EXPECT_EQ(std::string(read_size - limit_size, '0' + (num + 1)),
+ buffer.substr(limit_size));
+ } else {
+ EXPECT_EQ(std::string(read_size, '0' + num), buffer);
+ }
+ });
+ }
+ {
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file,
&local_reader).ok());
+ auto reader = CachedRemoteFileReader(local_reader, opts);
+ std::string buffer;
+ buffer.resize(10086);
+ IOContext io_ctx;
+ size_t bytes_read {0};
+ ASSERT_TRUE(reader.read_at(9_mb, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
+ EXPECT_EQ(buffer, std::string(10086, '9'));
+ EXPECT_EQ(reader._cache_file_readers.size(), 7);
+ }
+ {
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file,
&local_reader).ok());
+ auto reader = CachedRemoteFileReader(local_reader, opts);
+ std::string buffer;
+ buffer.resize(10086);
+ IOContext io_ctx;
+ size_t bytes_read {0};
+ ASSERT_TRUE(
+ reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ EXPECT_EQ(buffer, std::string(10086, '0'));
+ EXPECT_EQ(reader._cache_file_readers.size(), 8);
+ }
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+ config::enable_read_cache_file_directly = false;
+}
+
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]